cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [4/4] cassandra git commit: Add support for GROUP BY to SELECT statement
Date Thu, 04 Aug 2016 15:13:52 GMT
Add support for GROUP BY to SELECT statement

patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-10707


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4205011c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4205011c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4205011c

Branch: refs/heads/trunk
Commit: 4205011c0fb92bfb6a7456ab620f5d6b40cb9160
Parents: 90ba50f
Author: Benjamin Lerer <b.lerer@gmail.com>
Authored: Thu Aug 4 17:09:38 2016 +0200
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Thu Aug 4 17:09:38 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NEWS.txt                                        |    4 +-
 doc/cql3/CQL.textile                            |   18 +-
 doc/source/cql/changes.rst                      |    1 +
 doc/source/cql/dml.rst                          |   21 +
 pylib/cqlshlib/cql3handling.py                  |   12 +
 src/antlr/Lexer.g                               |    1 +
 src/antlr/Parser.g                              |   10 +-
 .../cassandra/cql3/MultiColumnRelation.java     |    4 +
 .../cassandra/cql3/SingleColumnRelation.java    |    5 +
 .../restrictions/MultiColumnRestriction.java    |   10 +-
 .../cql3/restrictions/RestrictionSet.java       |    7 +
 .../restrictions/RestrictionSetWrapper.java     |    6 +
 .../cql3/restrictions/Restrictions.java         |   16 +-
 .../restrictions/SingleColumnRestriction.java   |    5 +-
 .../restrictions/StatementRestrictions.java     |   39 +-
 .../cql3/restrictions/TokenFilter.java          |    9 +
 .../cql3/restrictions/TokenRestriction.java     |    8 +-
 .../cassandra/cql3/selection/Selection.java     |   55 +-
 .../cql3/statements/CreateViewStatement.java    |    4 +-
 .../cql3/statements/ModificationStatement.java  |    5 +-
 .../cql3/statements/SelectStatement.java        |  240 ++-
 .../cassandra/db/ClusteringComparator.java      |   16 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   26 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   16 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |    5 -
 .../db/SinglePartitionReadCommand.java          |   14 +
 .../aggregation/AggregationSpecification.java   |  182 ++
 .../cassandra/db/aggregation/GroupMaker.java    |  142 ++
 .../cassandra/db/aggregation/GroupingState.java |  141 ++
 .../apache/cassandra/db/filter/DataLimits.java  |  721 ++++++-
 .../apache/cassandra/service/DataResolver.java  |   41 +-
 .../apache/cassandra/service/StorageProxy.java  |   15 +-
 .../service/pager/AbstractQueryPager.java       |   33 +-
 .../service/pager/AggregationQueryPager.java    |  428 +++++
 .../service/pager/MultiPartitionPager.java      |   69 +-
 .../service/pager/PartitionRangeQueryPager.java |   28 +-
 .../cassandra/service/pager/QueryPager.java     |   14 +
 .../service/pager/SinglePartitionPager.java     |   27 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |    7 +-
 .../operations/SelectGroupByTest.java           | 1817 ++++++++++++++++++
 .../SelectMultiColumnRelationTest.java          |   13 +-
 .../SelectSingleColumnRelationTest.java         |   10 +-
 .../db/aggregation/GroupMakerTest.java          |  178 ++
 44 files changed, 4206 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69d7d6f..1636e3a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
  * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
  * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
  * Add version command to cassandra-stress (CASSANDRA-12258)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index e856d8f0..f28c6f2 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -19,6 +19,7 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Support for GROUP BY queries has been added.
    - A new compaction-stress tool has been added to test the throughput of compaction
      for any cassandra-stress user schema.  see compaction-stress help for how to use.
    - Compaction can now take into account overlapping tables that don't take part
@@ -153,8 +154,9 @@ New features
      a QueryHandler implementation. See CASSANDRA-11295 for more details.
 
 
+
 3.4
-=====
+===
 
 New features
 ------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/doc/cql3/CQL.textile
----------------------------------------------------------------------
diff --git a/doc/cql3/CQL.textile b/doc/cql3/CQL.textile
index e2fee84..07c8c61 100644
--- a/doc/cql3/CQL.textile
+++ b/doc/cql3/CQL.textile
@@ -1,6 +1,6 @@
 <link rel="StyleSheet" href="CQL.css" type="text/css" media="screen">
 
-h1. Cassandra Query Language (CQL) v3.4.2
+h1. Cassandra Query Language (CQL) v3.4.3
 
 
 
@@ -1061,6 +1061,7 @@ bc(syntax)..
 <select-stmt> ::= SELECT ( JSON )? <select-clause>
                   FROM <tablename>
                   ( WHERE <where-clause> )?
+                  ( GROUP BY <group-by>)?
                   ( ORDER BY <order-by> )?
                   ( PER PARTITION LIMIT <integer> )?
                   ( LIMIT <integer> )?
@@ -1088,6 +1089,7 @@ bc(syntax)..
              | TOKEN '(' <identifier> ( ',' <identifer>)* ')' <op> <term>
 
 <op> ::= '=' | '<' | '>' | '<=' | '>=' | CONTAINS | CONTAINS KEY
+<group-by> ::= <identifier> (',' <identifier>)*
 <order-by> ::= <ordering> ( ',' <odering> )*
 <ordering> ::= <identifer> ( ASC | DESC )?
 <term-tuple> ::= '(' <term> (',' <term>)* ')'
@@ -1182,6 +1184,16 @@ The @ORDER BY@ option allows to select the order of the returned results. It tak
 * if the table has been defined without any specific @CLUSTERING ORDER@, then then allowed orderings are the order induced by the clustering columns and the reverse of that one.
 * otherwise, the orderings allowed are the order of the @CLUSTERING ORDER@ option and the reversed one.
 
+h4(#selectGroupBy). @<group-by>@
+
+The @GROUP BY@ option allows to condense into a single row all selected rows that share the same values for a set of columns.
+
+Using the @GROUP BY@ option, it is only possible to group rows at the partition key level or at a clustering column level. By consequence, the @GROUP BY@ option only accept as arguments primary key column names in the primary key order. If a primary key column is restricted by an equality restriction it is not required to be present in the @GROUP BY@ clause.
+
+Aggregate functions will produce a separate value for each group. If no @GROUP BY@ clause is specified, aggregates functions will produce a single value for all the rows.
+
+If a column is selected without an aggregate function, in a statement with a @GROUP BY@, the first value encounter in each group will be returned.
+
 h4(#selectLimit). @LIMIT@ and @PER PARTITION LIMIT@
 
 The @LIMIT@ option to a @SELECT@ statement limits the number of rows returned by a query, while the @PER PARTITION LIMIT@ option limits the number of rows returned for a given partition by the query. Note that both type of limit can used in the same statement.
@@ -2413,6 +2425,10 @@ h2(#changes). Changes
 
 The following describes the changes in each version of CQL.
 
+h3. 3.4.3
+
+* Support for @GROUP BY@. See "@<group-by>@":#selectGroupBy (see "CASSANDRA-10707":https://issues.apache.org/jira/browse/CASSANDRA-10707).
+
 h3. 3.4.2
 
 * "@INSERT/UPDATE options@":#updateOptions for tables having a default_time_to_live specifying a TTL of 0 will remove the TTL from the inserted or updated values

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/doc/source/cql/changes.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/changes.rst b/doc/source/cql/changes.rst
index d0c51cc..7d7c2b9 100644
--- a/doc/source/cql/changes.rst
+++ b/doc/source/cql/changes.rst
@@ -24,6 +24,7 @@ The following describes the changes in each version of CQL.
 3.4.3
 ^^^^^
 
+- Support for ``GROUP BY`` (:jira:`10707`).
 - Adds a ``DEFAULT UNSET`` option for ``INSERT JSON`` to ignore omitted columns (:jira:`11424`).
 
 3.4.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/doc/source/cql/dml.rst
----------------------------------------------------------------------
diff --git a/doc/source/cql/dml.rst b/doc/source/cql/dml.rst
index f1c126b..ad878a9 100644
--- a/doc/source/cql/dml.rst
+++ b/doc/source/cql/dml.rst
@@ -34,6 +34,7 @@ Querying data from data is done using a ``SELECT`` statement:
    select_statement: SELECT [ JSON | DISTINCT ] ( `select_clause` | '*' )
                    : FROM `table_name`
                    : [ WHERE `where_clause` ]
+                   : [ GROUP BY `group_by_clause` ]
                    : [ ORDER BY `ordering_clause` ]
                    : [ PER PARTITION LIMIT (`integer` | `bind_marker`) ]
                    : [ LIMIT (`integer` | `bind_marker`) ]
@@ -49,6 +50,7 @@ Querying data from data is done using a ``SELECT`` statement:
            : '(' `column_name` ( ',' `column_name` )* ')' `operator` `tuple_literal`
            : TOKEN '(' `column_name` ( ',' `column_name` )* ')' `operator` `term`
    operator: '=' | '<' | '>' | '<=' | '>=' | '!=' | IN | CONTAINS | CONTAINS KEY
+   group_by_clause: `column_name` ( ',' `column_name` )*
    ordering_clause: `column_name` [ ASC | DESC ] ( ',' `column_name` [ ASC | DESC ] )*
 
 For instance::
@@ -208,6 +210,25 @@ The ``CONTAINS`` operator may only be used on collection columns (lists, sets, a
 ``CONTAINS`` applies to the map values. The ``CONTAINS KEY`` operator may only be used on map columns and applies to the
 map keys.
 
+.. _group-by-clause:
+
+Grouping results
+~~~~~~~~~~~~~~~~
+
+The ``GROUP BY`` option allows to condense into a single row all selected rows that share the same values for a set
+of columns.
+
+Using the ``GROUP BY`` option, it is only possible to group rows at the partition key level or at a clustering column
+level. By consequence, the ``GROUP BY`` option only accept as arguments primary key column names in the primary key
+order. If a primary key column is restricted by an equality restriction it is not required to be present in the
+``GROUP BY`` clause.
+
+Aggregate functions will produce a separate value for each group. If no ``GROUP BY`` clause is specified,
+aggregates functions will produce a single value for all the rows.
+
+If a column is selected without an aggregate function, in a statement with a ``GROUP BY``, the first value encounter
+in each group will be returned.
+
 .. _ordering-clause:
 
 Ordering results

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index c01e441..f9bf028 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -681,6 +681,7 @@ syntax_rules += r'''
 <selectStatement> ::= "SELECT" ( "JSON" )? <selectClause>
                         "FROM" (cf=<columnFamilyName> | mv=<materializedViewName>)
                           ( "WHERE" <whereClause> )?
+                          ( "GROUP" "BY" <groupByClause> ( "," <groupByClause> )* )?
                           ( "ORDER" "BY" <orderByClause> ( "," <orderByClause> )* )?
                           ( "LIMIT" limit=<wholenumber> )?
                           ( "ALLOW" "FILTERING" )?
@@ -711,6 +712,8 @@ syntax_rules += r'''
                           ;
 <orderByClause> ::= [ordercol]=<cident> ( "ASC" | "DESC" )?
                   ;
+<groupByClause> ::= [groupcol]=<cident>
+                  ;
 '''
 
 
@@ -790,6 +793,15 @@ def select_order_column_completer(ctxt, cass):
         return [maybe_escape_name(order_by_candidates[len(prev_order_cols)])]
     return [Hint('No more orderable columns here.')]
 
+@completer_for('groupByClause', 'groupcol')
+def select_group_column_completer(ctxt, cass):
+    prev_group_cols = ctxt.get_binding('groupcol', ())
+    layout = get_table_meta(ctxt, cass)
+    group_by_candidates = [col.name for col in layout.primary_key]
+    if len(group_by_candidates) > len(prev_group_cols):
+        return [maybe_escape_name(group_by_candidates[len(prev_group_cols)])]
+    return [Hint('No more columns here.')]
+
 
 @completer_for('relation', 'token')
 def relation_token_word_completer(ctxt, cass):

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/antlr/Lexer.g
----------------------------------------------------------------------
diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g
index a65b4f5..0d79520 100644
--- a/src/antlr/Lexer.g
+++ b/src/antlr/Lexer.g
@@ -117,6 +117,7 @@ K_FILTERING:   F I L T E R I N G;
 K_IF:          I F;
 K_IS:          I S;
 K_CONTAINS:    C O N T A I N S;
+K_GROUP:       G R O U P;
 
 K_GRANT:       G R A N T;
 K_ALL:         A L L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/antlr/Parser.g
----------------------------------------------------------------------
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index e762bde..d33125f 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -248,6 +248,7 @@ selectStatement returns [SelectStatement.RawStatement expr]
         Term.Raw limit = null;
         Term.Raw perPartitionLimit = null;
         Map<ColumnDefinition.Raw, Boolean> orderings = new LinkedHashMap<>();
+        List<ColumnDefinition.Raw> groups = new ArrayList<>();
         boolean allowFiltering = false;
         boolean isJson = false;
     }
@@ -256,12 +257,14 @@ selectStatement returns [SelectStatement.RawStatement expr]
       ( ( K_DISTINCT { isDistinct = true; } )? sclause=selectClause )
       K_FROM cf=columnFamilyName
       ( K_WHERE wclause=whereClause )?
+      ( K_GROUP K_BY groupByClause[groups] ( ',' groupByClause[groups] )* )?
       ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
       ( K_PER K_PARTITION K_LIMIT rows=intValue { perPartitionLimit = rows; } )?
       ( K_LIMIT rows=intValue { limit = rows; } )?
       ( K_ALLOW K_FILTERING  { allowFiltering = true; } )?
       {
           SelectStatement.Parameters params = new SelectStatement.Parameters(orderings,
+                                                                             groups,
                                                                              isDistinct,
                                                                              allowFiltering,
                                                                              isJson);
@@ -326,6 +329,10 @@ orderByClause[Map<ColumnDefinition.Raw, Boolean> orderings]
     : c=cident (K_ASC | K_DESC { reversed = true; })? { orderings.put(c, reversed); }
     ;
 
+groupByClause[List<ColumnDefinition.Raw> groups]
+    : c=cident { groups.add(c); }
+    ;
+
 /**
  * INSERT INTO <CF> (<column>, <column>, <column>, ...)
  * VALUES (<value>, <value>, <value>, ...)
@@ -1636,5 +1643,6 @@ basic_unreserved_keyword returns [String str]
         | K_LIKE
         | K_PER
         | K_PARTITION
+        | K_GROUP
         ) { $str = $k.text; }
-    ;
+    ;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
index 01f2a12..4ddfabb 100644
--- a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java
@@ -152,6 +152,10 @@ public class MultiColumnRelation extends Relation
             Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames);
             return new MultiColumnRestriction.InRestrictionWithMarker(receivers, (AbstractMarker) term);
         }
+
+        if (terms.size() == 1)
+            return new MultiColumnRestriction.EQRestriction(receivers, terms.get(0));
+
         return new MultiColumnRestriction.InRestrictionWithValues(receivers, terms);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
index 07232d2..1565102 100644
--- a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
@@ -182,6 +182,11 @@ public final class SingleColumnRelation extends Relation
             Term term = toTerm(receivers, value, cfm.ksName, boundNames);
             return new SingleColumnRestriction.InRestrictionWithMarker(columnDef, (Lists.Marker) term);
         }
+
+        // An IN restrictions with only one element is the same than an EQ restriction
+        if (terms.size() == 1)
+            return new SingleColumnRestriction.EQRestriction(columnDef, terms.get(0));
+
         return new SingleColumnRestriction.InRestrictionWithValues(columnDef, terms);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index e5e3bc8..b0cbdff 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -239,15 +239,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction
                                          SecondaryIndexManager indexManager,
                                          QueryOptions options)
         {
-            List<List<ByteBuffer>> splitInValues = splitValues(options);
-            checkTrue(splitInValues.size() == 1, "IN restrictions are not supported on indexed columns");
-            List<ByteBuffer> values = splitInValues.get(0);
-
-            for (int i = 0, m = columnDefs.size(); i < m; i++)
-            {
-                ColumnDefinition columnDef = columnDefs.get(i);
-                filter.add(columnDef, Operator.EQ, values.get(i));
-            }
+            throw  invalidRequest("IN restrictions are not supported on indexed columns");
         }
 
         protected abstract List<List<ByteBuffer>> splitValues(QueryOptions options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index bb943d5..0e6c2f7 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -141,6 +141,13 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction>
         return restrictions;
     }
 
+    @Override
+    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    {
+        Restriction existing = restrictions.get(columnDef);
+        return existing == null ? Collections.emptySet() : Collections.singleton(existing);
+    }
+
     /**
      * Returns all the restrictions applied to the specified columns.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index 6ad5fbb..6b110da 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3.restrictions;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -92,4 +93,9 @@ class RestrictionSetWrapper implements Restrictions
     {
         return restrictions.hasOnlyEqualityRestrictions();
     }
+
+    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    {
+        return restrictions.getRestrictions(columnDef);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 7ca82ab..4cf165f 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -17,15 +17,27 @@
  */
 package org.apache.cassandra.cql3.restrictions;
 
+import java.util.Set;
+
+import org.apache.cassandra.config.ColumnDefinition;
+
 /**
  * Sets of restrictions
  */
 public interface Restrictions extends Restriction
 {
     /**
-     * Checks if this <code>PrimaryKeyRestrictionSet</code> is empty or not.
+     * Returns the restrictions applied to the specified column.
+     *
+     * @param columnDef the column definition
+     * @return the restrictions applied to the specified column
+     */
+    Set<Restriction> getRestrictions(ColumnDefinition columnDef);
+
+    /**
+     * Checks if this <code>Restrictions</code> is empty or not.
      *
-     * @return <code>true</code> if this <code>PrimaryKeyRestrictionSet</code> is empty, <code>false</code> otherwise.
+     * @return <code>true</code> if this <code>Restrictions</code> is empty, <code>false</code> otherwise.
      */
     boolean isEmpty();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index ae883d1..1d84331 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -218,10 +218,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction
                                    SecondaryIndexManager indexManager,
                                    QueryOptions options)
         {
-            List<ByteBuffer> values = getValues(options);
-            checkTrue(values.size() == 1, "IN restrictions are not supported on indexed columns");
-
-            filter.add(columnDef, Operator.EQ, values.get(0));
+            throw invalidRequest("IN restrictions are not supported on indexed columns");
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 62b48b3..0b3b37d 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -319,12 +319,8 @@ public final class StatementRestrictions
     {
         if (notNullColumns.contains(column))
             return true;
-        else if (column.isPartitionKey())
-            return partitionKeyRestrictions.getColumnDefs().contains(column);
-        else if (column.isClusteringColumn())
-            return clusteringColumnsRestrictions.getColumnDefs().contains(column);
-        else
-            return nonPrimaryKeyRestrictions.getColumnDefs().contains(column);
+
+        return getRestrictions(column.kind).getColumnDefs().contains(column);
     }
 
     /**
@@ -349,6 +345,37 @@ public final class StatementRestrictions
     }
 
     /**
+     * Checks if the specified column is restricted by an EQ restriction.
+     *
+     * @param columnDef the column definition
+     * @return <code>true</code> if the specified column is restricted by an EQ restiction, <code>false</code>
+     * otherwise.
+     */
+    public boolean isColumnRestrictedByEq(ColumnDefinition columnDef)
+    {
+        Set<Restriction> restrictions = getRestrictions(columnDef.kind).getRestrictions(columnDef);
+        return restrictions.stream()
+                           .filter(SingleRestriction.class::isInstance)
+                           .anyMatch(p -> ((SingleRestriction) p).isEQ());
+    }
+
+    /**
+     * Returns the <code>Restrictions</code> for the specified type of columns.
+     *
+     * @param kind the column type
+     * @return the <code>Restrictions</code> for the specified type of columns
+     */
+    private Restrictions getRestrictions(ColumnDefinition.Kind kind)
+    {
+        switch (kind)
+        {
+            case PARTITION_KEY: return partitionKeyRestrictions;
+            case CLUSTERING: return clusteringColumnsRestrictions;
+            default: return nonPrimaryKeyRestrictions;
+        }
+    }
+
+    /**
      * Checks if the secondary index need to be queried.
      *
      * @return <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index 66e0cfb..c27d742 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -69,6 +69,15 @@ final class TokenFilter implements PartitionKeyRestrictions
     }
 
     @Override
+    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    {
+        Set<Restriction> set = new HashSet<>();
+        set.addAll(restrictions.getRestrictions(columnDef));
+        set.addAll(tokenRestriction.getRestrictions(columnDef));
+        return set;
+    }
+
+    @Override
     public boolean isOnToken()
     {
         // if all partition key columns have non-token restrictions, we can simply use the token range to filter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index a71c64c..dd31730 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -73,7 +73,13 @@ public abstract class TokenRestriction implements PartitionKeyRestrictions
     }
 
     @Override
-    public  boolean isOnToken()
+    public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
+    {
+        return Collections.singleton(this);
+    }
+
+    @Override
+    public final boolean isOnToken()
     {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/selection/Selection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/selection/Selection.java b/src/java/org/apache/cassandra/cql3/selection/Selection.java
index f547376..20fad4d 100644
--- a/src/java/org/apache/cassandra/cql3/selection/Selection.java
+++ b/src/java/org/apache/cassandra/cql3/selection/Selection.java
@@ -29,9 +29,13 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.Function;
-import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.aggregation.AggregationSpecification;
+import org.apache.cassandra.db.aggregation.GroupMaker;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -168,7 +172,7 @@ public abstract class Selection
         return false;
     }
 
-    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors, VariableSpecifications boundNames) throws InvalidRequestException
+    public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors, VariableSpecifications boundNames, boolean hasGroupBy)
     {
         List<ColumnDefinition> defs = new ArrayList<>();
 
@@ -176,7 +180,7 @@ public abstract class Selection
                 SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), null, cfm, defs, boundNames);
         SelectionColumnMapping mapping = collectColumnMappings(cfm, rawSelectors, factories);
 
-        return (processesSelection(rawSelectors) || rawSelectors.size() != defs.size())
+        return (processesSelection(rawSelectors) || rawSelectors.size() != defs.size() || hasGroupBy)
                ? new SelectionWithProcessing(cfm, defs, mapping, factories)
                : new SimpleSelection(cfm, defs, mapping, false);
     }
@@ -238,9 +242,15 @@ public abstract class Selection
         return columnMapping;
     }
 
-    public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJons) throws InvalidRequestException
+    public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson)
+    {
+        return new ResultSetBuilder(options, isJson);
+    }
+
+    public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson, AggregationSpecification aggregationSpec)
     {
-        return new ResultSetBuilder(options, isJons);
+        return aggregationSpec == null ? new ResultSetBuilder(options, isJson) 
+                : new ResultSetBuilder(options, isJson, aggregationSpec.newGroupMaker());
     }
 
     public abstract boolean isAggregate();
@@ -294,6 +304,11 @@ public abstract class Selection
          */
         private final Selectors selectors;
 
+        /**
+         * The <code>GroupMaker</code> used to build the aggregates.
+         */
+        private final GroupMaker groupMaker;
+
         /*
          * We'll build CQL3 row one by one.
          * The currentRow is the values for the (CQL3) columns we've fetched.
@@ -308,11 +323,17 @@ public abstract class Selection
 
         private final boolean isJson;
 
-        private ResultSetBuilder(QueryOptions options, boolean isJson) throws InvalidRequestException
+        private ResultSetBuilder(QueryOptions options, boolean isJson)
+        {
+            this(options, isJson, null);
+        }
+
+        private ResultSetBuilder(QueryOptions options, boolean isJson, GroupMaker groupMaker)
         {
             this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), new ArrayList<List<ByteBuffer>>());
             this.protocolVersion = options.getProtocolVersion();
             this.selectors = newSelectors(options);
+            this.groupMaker = groupMaker;
             this.timestamps = collectTimestamps ? new long[columns.size()] : null;
             this.ttls = collectTTLs ? new int[columns.size()] : null;
             this.isJson = isJson;
@@ -362,12 +383,20 @@ public abstract class Selection
                  : c.value();
         }
 
-        public void newRow() throws InvalidRequestException
+        /**
+         * Notifies this <code>Builder</code> that a new row is being processed.
+         *
+         * @param partitionKey the partition key of the new row
+         * @param clustering the clustering of the new row
+         */
+        public void newRow(DecoratedKey partitionKey, Clustering clustering)
         {
+            // The groupMaker needs to be called for each row
+            boolean isNewAggregate = groupMaker == null || groupMaker.isNewGroup(partitionKey, clustering);
             if (current != null)
             {
                 selectors.addInputRow(protocolVersion, this);
-                if (!selectors.isAggregate())
+                if (isNewAggregate)
                 {
                     resultSet.addRow(getOutputRow());
                     selectors.reset();
@@ -376,7 +405,12 @@ public abstract class Selection
             current = new ArrayList<>(columns.size());
         }
 
-        public ResultSet build() throws InvalidRequestException
+        /**
+         * Builds the <code>ResultSet</code>
+         *
+         * @param protocolVersion the protocol version
+         */
+        public ResultSet build()
         {
             if (current != null)
             {
@@ -386,7 +420,8 @@ public abstract class Selection
                 current = null;
             }
 
-            if (resultSet.isEmpty() && selectors.isAggregate())
+            // For aggregates we need to return a row even it no records have been found
+            if (resultSet.isEmpty() && groupMaker != null && groupMaker.returnAtLeastOneRow())
                 resultSet.addRow(getOutputRow());
             return resultSet;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 8fe9f7e..e25045e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -187,7 +187,9 @@ public class CreateViewStatement extends SchemaAlteringStatement
 
         // build the select statement
         Map<ColumnDefinition.Raw, Boolean> orderings = Collections.emptyMap();
-        SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false);
+        List<ColumnDefinition.Raw> groups = Collections.emptyList();
+        SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, groups, false, true, false);
+
         SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null, null);
 
         ClientState state = ClientState.forInternalCalls();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 675aaea..822664a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -549,7 +549,10 @@ public abstract class ModificationStatement implements CQLStatement
         }
 
         Selection.ResultSetBuilder builder = selection.resultSetBuilder(options, false);
-        SelectStatement.forSelection(cfm, selection).processPartition(partition, options, builder, FBUtilities.nowInSeconds());
+        SelectStatement.forSelection(cfm, selection).processPartition(partition,
+                                                                      options,
+                                                                      builder,
+                                                                      FBUtilities.nowInSeconds());
 
         return builder.build();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d3fb159..7cd2be0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.selection.RawSelector;
 import org.apache.cassandra.cql3.selection.Selection;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.aggregation.AggregationSpecification;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -50,6 +51,7 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.pager.AggregationQueryPager;
 import org.apache.cassandra.service.pager.PagingState;
 import org.apache.cassandra.service.pager.QueryPager;
 import org.apache.cassandra.thrift.ThriftValidation;
@@ -76,7 +78,8 @@ public class SelectStatement implements CQLStatement
 {
     private static final Logger logger = LoggerFactory.getLogger(SelectStatement.class);
 
-    private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
+    public static final int DEFAULT_PAGE_SIZE = 10000;
+
     private final int boundTerms;
     public final CFMetaData cfm;
     public final Parameters parameters;
@@ -89,6 +92,11 @@ public class SelectStatement implements CQLStatement
     private final boolean isReversed;
 
     /**
+     * The <code>AggregationSpecification</code> used to make the aggregates.
+     */
+    private final AggregationSpecification aggregationSpec;
+
+    /**
      * The comparator used to orders results when multiple keys are selected (using IN).
      */
     private final Comparator<List<ByteBuffer>> orderingComparator;
@@ -96,7 +104,11 @@ public class SelectStatement implements CQLStatement
     private final ColumnFilter queriedColumns;
 
     // Used by forSelection below
-    private static final Parameters defaultParameters = new Parameters(Collections.<ColumnDefinition.Raw, Boolean>emptyMap(), false, false, false);
+    private static final Parameters defaultParameters = new Parameters(Collections.emptyMap(),
+                                                                       Collections.emptyList(),
+                                                                       false,
+                                                                       false,
+                                                                       false);
 
     public SelectStatement(CFMetaData cfm,
                            int boundTerms,
@@ -104,6 +116,7 @@ public class SelectStatement implements CQLStatement
                            Selection selection,
                            StatementRestrictions restrictions,
                            boolean isReversed,
+                           AggregationSpecification aggregationSpec,
                            Comparator<List<ByteBuffer>> orderingComparator,
                            Term limit,
                            Term perPartitionLimit)
@@ -113,6 +126,7 @@ public class SelectStatement implements CQLStatement
         this.selection = selection;
         this.restrictions = restrictions;
         this.isReversed = isReversed;
+        this.aggregationSpec = aggregationSpec;
         this.orderingComparator = orderingComparator;
         this.parameters = parameters;
         this.limit = limit;
@@ -178,6 +192,7 @@ public class SelectStatement implements CQLStatement
                                    false,
                                    null,
                                    null,
+                                   null,
                                    null);
     }
 
@@ -223,39 +238,29 @@ public class SelectStatement implements CQLStatement
         int nowInSec = FBUtilities.nowInSeconds();
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
-        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);
-
-        int pageSize = getPageSize(options);
+        int pageSize = options.getPageSize();
+        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize);
 
-        if (pageSize <= 0 || query.limits().count() <= pageSize)
+        if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
             return execute(query, options, state, nowInSec, userLimit);
 
-        QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
-        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit);
-    }
+        QueryPager pager = getPager(query, options);
 
-    private int getPageSize(QueryOptions options)
-    {
-        int pageSize = options.getPageSize();
-
-        // An aggregation query will never be paged for the user, but we always page it internally to avoid OOM.
-        // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
-        // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
-        if (selection.isAggregate() && pageSize <= 0)
-            pageSize = DEFAULT_COUNT_PAGE_SIZE;
-
-        return  pageSize;
+        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit);
     }
 
     public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
     {
-        return getQuery(options, nowInSec, getLimit(options), getPerPartitionLimit(options));
+        return getQuery(options, nowInSec, getLimit(options), getPerPartitionLimit(options), options.getPageSize());
     }
 
-    public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit, int perPartitionLimit) throws RequestValidationException
+    public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit, int perPartitionLimit, int pageSize)
     {
-        DataLimits limit = getDataLimits(userLimit, perPartitionLimit);
-        if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
+        boolean isPartitionRangeQuery = restrictions.isKeyRange() || restrictions.usesSecondaryIndexing();
+
+        DataLimits limit = getDataLimits(userLimit, perPartitionLimit, pageSize);
+
+        if (isPartitionRangeQuery)
             return getRangeCommand(options, limit, nowInSec);
 
         return getSliceCommands(options, limit, nowInSec);
@@ -346,11 +351,21 @@ public class SelectStatement implements CQLStatement
                                        int nowInSec,
                                        int userLimit) throws RequestValidationException, RequestExecutionException
     {
-        if (selection.isAggregate())
-            return pageAggregateQuery(pager, options, pageSize, nowInSec);
+        if (aggregationSpec != null)
+        {
+            if (!restrictions.hasPartitionKeyRestrictions())
+            {
+                warn("Aggregation query used without partition key");
+            }
+            else if (restrictions.keyIsInRelation())
+            {
+                warn("Aggregation query used on multiple partition keys (IN restriction)");
+            }
+        }
 
         // We can't properly do post-query ordering if we page (see #6722)
-        checkFalse(needsPostQueryOrdering(),
+        // For GROUP BY or aggregation queries we always page internally even if the user has turned paging off
+        checkFalse(pageSize > 0 && needsPostQueryOrdering(),
                   "Cannot page queries with both ORDER BY and a IN restriction on the partition key;"
                   + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
 
@@ -368,35 +383,10 @@ public class SelectStatement implements CQLStatement
         return msg;
     }
 
-    private ResultMessage.Rows pageAggregateQuery(Pager pager, QueryOptions options, int pageSize, int nowInSec)
-    throws RequestValidationException, RequestExecutionException
+    private void warn(String msg)
     {
-        if (!restrictions.hasPartitionKeyRestrictions())
-        {
-            logger.warn("Aggregation query used without partition key");
-            ClientWarn.instance.warn("Aggregation query used without partition key");
-        }
-        else if (restrictions.keyIsInRelation())
-        {
-            logger.warn("Aggregation query used on multiple partition keys (IN restriction)");
-            ClientWarn.instance.warn("Aggregation query used on multiple partition keys (IN restriction)");
-        }
-
-        Selection.ResultSetBuilder result = selection.resultSetBuilder(options, parameters.isJson);
-        while (!pager.isExhausted())
-        {
-            try (PartitionIterator iter = pager.fetchPage(pageSize))
-            {
-                while (iter.hasNext())
-                {
-                    try (RowIterator partition = iter.next())
-                    {
-                        processPartition(partition, options, result, nowInSec);
-                    }
-                }
-            }
-        }
-        return new ResultMessage.Rows(result.build());
+        logger.warn(msg);
+        ClientWarn.instance.warn(msg);
     }
 
     private ResultMessage.Rows processResults(PartitionIterator partitions,
@@ -417,12 +407,12 @@ public class SelectStatement implements CQLStatement
     {
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
-        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);
-        int pageSize = getPageSize(options);
+        int pageSize = options.getPageSize();
+        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize);
 
         try (ReadExecutionController executionController = query.executionController())
         {
-            if (pageSize <= 0 || query.limits().count() <= pageSize)
+            if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
             {
                 try (PartitionIterator data = query.executeInternal(executionController))
                 {
@@ -431,12 +421,23 @@ public class SelectStatement implements CQLStatement
             }
             else
             {
-                QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
+                QueryPager pager = getPager(query, options);
+
                 return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit);
             }
         }
     }
 
+    private QueryPager getPager(ReadQuery query, QueryOptions options)
+    {
+        QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
+
+        if (aggregationSpec == null || query == ReadQuery.EMPTY)
+            return pager;
+
+        return new AggregationQueryPager(pager, query.limits());
+    }
+
     public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
     {
         return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
@@ -636,23 +637,37 @@ public class SelectStatement implements CQLStatement
         return builder.build();
     }
 
-    private DataLimits getDataLimits(int userLimit, int perPartitionLimit)
+    private DataLimits getDataLimits(int userLimit, int perPartitionLimit, int pageSize)
     {
         int cqlRowLimit = DataLimits.NO_LIMIT;
         int cqlPerPartitionLimit = DataLimits.NO_LIMIT;
 
-        // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and
-        // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever
-        // return 1 result and can therefore basically ignore the user LIMIT in this case.
-        // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus
-        // able to apply the user limit properly.
         // If we do post ordering we need to get all the results sorted before we can trim them.
-        if (!selection.isAggregate())
+        if (aggregationSpec != AggregationSpecification.AGGREGATE_EVERYTHING)
         {
             if (!needsPostQueryOrdering())
                 cqlRowLimit = userLimit;
             cqlPerPartitionLimit = perPartitionLimit;
         }
+
+        // Group by and aggregation queries will always be paged internally to avoid OOM.
+        // If the user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
+        if (pageSize <= 0)
+            pageSize = DEFAULT_PAGE_SIZE;
+
+        // Aggregation queries work fine on top of the group by paging but to maintain
+        // backward compatibility we need to use the old way.
+        if (aggregationSpec != null && aggregationSpec != AggregationSpecification.AGGREGATE_EVERYTHING)
+        {
+            if (parameters.isDistinct)
+                return DataLimits.distinctLimits(cqlRowLimit);
+
+            return DataLimits.groupByLimits(cqlRowLimit,
+                                            cqlPerPartitionLimit,
+                                            pageSize,
+                                            aggregationSpec);
+        }
+
         if (parameters.isDistinct)
             return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit);
 
@@ -732,7 +747,8 @@ public class SelectStatement implements CQLStatement
                               int nowInSec,
                               int userLimit) throws InvalidRequestException
     {
-        Selection.ResultSetBuilder result = selection.resultSetBuilder(options, parameters.isJson);
+        Selection.ResultSetBuilder result = selection.resultSetBuilder(options, parameters.isJson, aggregationSpec);
+
         while (partitions.hasNext())
         {
             try (RowIterator partition = partitions.next())
@@ -779,7 +795,7 @@ public class SelectStatement implements CQLStatement
         {
             if (!staticRow.isEmpty() && (!restrictions.hasClusteringColumnsRestriction() || cfm.isStaticCompactTable()))
             {
-                result.newRow();
+                result.newRow(partition.partitionKey(), staticRow.clustering());
                 for (ColumnDefinition def : selection.getColumns())
                 {
                     switch (def.kind)
@@ -801,7 +817,7 @@ public class SelectStatement implements CQLStatement
         while (partition.hasNext())
         {
             Row row = partition.next();
-            result.newRow();
+            result.newRow( partition.partitionKey(), row.clustering());
             // Respect selection order
             for (ColumnDefinition def : selection.getColumns())
             {
@@ -894,7 +910,7 @@ public class SelectStatement implements CQLStatement
 
             Selection selection = selectClause.isEmpty()
                                   ? Selection.wildcard(cfm)
-                                  : Selection.fromSelectors(cfm, selectClause, boundNames);
+                                  : Selection.fromSelectors(cfm, selectClause, boundNames, !parameters.groups.isEmpty());
 
             StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection, forView);
 
@@ -904,7 +920,12 @@ public class SelectStatement implements CQLStatement
                 validateDistinctSelection(cfm, selection, restrictions);
             }
 
-            checkFalse(selection.isAggregate() && perPartitionLimit != null,
+            AggregationSpecification aggregationSpec = getAggregationSpecification(cfm,
+                                                                                   selection,
+                                                                                   restrictions,
+                                                                                   parameters.isDistinct);
+
+            checkFalse(aggregationSpec == AggregationSpecification.AGGREGATE_EVERYTHING && perPartitionLimit != null,
                        "PER PARTITION LIMIT is not allowed with aggregate queries.");
 
             Comparator<List<ByteBuffer>> orderingComparator = null;
@@ -923,14 +944,15 @@ public class SelectStatement implements CQLStatement
             checkNeedsFiltering(restrictions);
 
             SelectStatement stmt = new SelectStatement(cfm,
-                                                        boundNames.size(),
-                                                        parameters,
-                                                        selection,
-                                                        restrictions,
-                                                        isReversed,
-                                                        orderingComparator,
-                                                        prepareLimit(boundNames, limit, keyspace(), limitReceiver()),
-                                                        prepareLimit(boundNames, perPartitionLimit, keyspace(), perPartitionLimitReceiver()));
+                                                       boundNames.size(),
+                                                       parameters,
+                                                       selection,
+                                                       restrictions,
+                                                       isReversed,
+                                                       aggregationSpec,
+                                                       orderingComparator,
+                                                       prepareLimit(boundNames, limit, keyspace(), limitReceiver()),
+                                                       prepareLimit(boundNames, perPartitionLimit, keyspace(), perPartitionLimitReceiver()));
 
             return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
         }
@@ -1002,6 +1024,63 @@ public class SelectStatement implements CQLStatement
                           "SELECT DISTINCT queries must request all the partition key columns (missing %s)", def.name);
         }
 
+        /**
+         * Creates the <code>AggregationSpecification</code>s used to make the aggregates.
+         *
+         * @param cfm the column family metadata
+         * @param selection the selection
+         * @param restrictions the restrictions
+         * @param isDistinct <code>true</code> if the query is a DISTINCT one. 
+         * @return the <code>AggregationSpecification</code>s used to make the aggregates
+         */
+        private AggregationSpecification getAggregationSpecification(CFMetaData cfm,
+                                                                     Selection selection,
+                                                                     StatementRestrictions restrictions,
+                                                                     boolean isDistinct)
+        {
+            if (parameters.groups.isEmpty())
+                return selection.isAggregate() ? AggregationSpecification.AGGREGATE_EVERYTHING
+                                               : null;
+
+            int clusteringPrefixSize = 0;
+
+            Iterator<ColumnDefinition> pkColumns = cfm.primaryKeyColumns().iterator();
+            for (ColumnDefinition.Raw raw : parameters.groups)
+            {
+                ColumnDefinition def = raw.prepare(cfm);
+
+                checkTrue(def.isPartitionKey() || def.isClusteringColumn(),
+                          "Group by is currently only supported on the columns of the PRIMARY KEY, got %s", def.name);
+
+                while (true)
+                {
+                    checkTrue(pkColumns.hasNext(),
+                              "Group by currently only support groups of columns following their declared order in the PRIMARY KEY");
+
+                    ColumnDefinition pkColumn = pkColumns.next();
+
+                    if (pkColumn.isClusteringColumn())
+                        clusteringPrefixSize++;
+
+                    // As we do not support grouping on only part of the partition key, we only need to know
+                    // which clustering columns need to be used to build the groups
+                    if (pkColumn.equals(def))
+                        break;
+
+                    checkTrue(restrictions.isColumnRestrictedByEq(pkColumn),
+                              "Group by currently only support groups of columns following their declared order in the PRIMARY KEY");
+                }
+            }
+
+            checkFalse(pkColumns.hasNext() && pkColumns.next().isPartitionKey(),
+                       "Group by is not supported on only a part of the partition key");
+
+            checkFalse(clusteringPrefixSize > 0 && isDistinct,
+                       "Grouping on clustering columns is not allowed for SELECT DISTINCT queries");
+
+            return AggregationSpecification.aggregatePkPrefix(cfm.comparator, clusteringPrefixSize);
+        }
+
         private Comparator<List<ByteBuffer>> getOrderingComparator(CFMetaData cfm,
                                                                    Selection selection,
                                                                    StatementRestrictions restrictions)
@@ -1119,16 +1198,19 @@ public class SelectStatement implements CQLStatement
     {
         // Public because CASSANDRA-9858
         public final Map<ColumnDefinition.Raw, Boolean> orderings;
+        public final List<ColumnDefinition.Raw> groups;
         public final boolean isDistinct;
         public final boolean allowFiltering;
         public final boolean isJson;
 
         public Parameters(Map<ColumnDefinition.Raw, Boolean> orderings,
+                          List<ColumnDefinition.Raw> groups,
                           boolean isDistinct,
                           boolean allowFiltering,
                           boolean isJson)
         {
             this.orderings = orderings;
+            this.groups = groups;
             this.isDistinct = isDistinct;
             this.allowFiltering = allowFiltering;
             this.isJson = isJson;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index 3030b5a..4374a46 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -144,7 +144,21 @@ public class ClusteringComparator implements Comparator<Clusterable>
 
     public int compare(Clustering c1, Clustering c2)
     {
-        for (int i = 0; i < size(); i++)
+        return compare(c1, c2, size());
+    }
+
+    /**
+     * Compares the specified part of the specified clusterings.
+     *
+     * @param c1 the first clustering
+     * @param c2 the second clustering
+     * @param size the number of components to compare
+     * @return a negative integer, zero, or a positive integer as the first argument is less than,
+     * equal to, or greater than the second.
+     */
+    public int compare(Clustering c1, Clustering c2, int size)
+    {
+        for (int i = 0; i < size; i++)
         {
             int cmp = compareComponent(i, c1.get(i), c2.get(i));
             if (cmp != 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index b13bf39..439dc30 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -119,9 +119,31 @@ public class PartitionRangeReadCommand extends ReadCommand
         return dataRange.isNamesQuery();
     }
 
-    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
+    /**
+     * Returns an equivalent command but that only queries data within the provided range.
+     *
+     * @param range the sub-range to restrict the command to. This method <b>assumes</b> that this is a proper sub-range
+     * of the command this is applied to.
+     * @param isRangeContinuation whether {@code range} is a direct continuation of whatever previous range we have
+     * queried. This matters for the {@code DataLimits} that may contain states when we do paging and in the context of
+     * parallel queries: that state only make sense if the range queried is indeed the follow-up of whatever range we've
+     * previously query (that yield said state). In practice this means that ranges for which {@code isRangeContinuation}
+     * is false may have to be slightly pessimistic when counting data and may include a little bit than necessary, and
+     * this should be dealt with post-query (in the case of {@code StorageProxy.getRangeSlice()}, which uses this method
+     * for replica queries, this is dealt with by re-counting results on the coordinator). Note that if this is the
+     * first range we queried, then the {@code DataLimits} will have not state and the value of this parameter doesn't
+     * matter.
+     */
+    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range, boolean isRangeContinuation)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
+        DataRange newRange = dataRange().forSubRange(range);
+        // If we're not a continuation of whatever range we've previously queried, we should ignore the states of the
+        // DataLimits as it's either useless, or misleading. This is particularly important for GROUP BY queries, where
+        // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in
+        // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are
+        // on the ring.
+        DataLimits newLimits = isRangeContinuation ? limits() : limits().withoutState();
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, newRange, index);
     }
 
     public PartitionRangeReadCommand copy()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 372cf1c..5f01733 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -166,6 +166,14 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
     protected abstract long selectionSerializedSize(int version);
 
     /**
+     * Creates a new <code>ReadCommand</code> instance with new limits.
+     *
+     * @param newLimits the new limits
+     * @return a new <code>ReadCommand</code> with the updated limits
+     */
+    public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
+
+    /**
      * The metadata for the table queried.
      *
      * @return the metadata for the table queried.
@@ -521,7 +529,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
 
     protected class CheckForAbort extends StoppingTransformation<BaseRowIterator<?>>
     {
-        protected BaseRowIterator<?> applyToPartition(BaseRowIterator partition)
+        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
         {
             if (maybeAbort())
             {
@@ -667,7 +675,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
             out.writeInt(command.nowInSec());
             ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
-            DataLimits.serializer.serialize(command.limits(), out, version);
+            DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator);
             if (command.index.isPresent())
                 IndexMetadata.serializer.serialize(command.index.get(), out, version);
 
@@ -688,7 +696,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
-            DataLimits limits = DataLimits.serializer.deserialize(in, version);
+            DataLimits limits = DataLimits.serializer.deserialize(in, version,  metadata.comparator);
             Optional<IndexMetadata> index = hasIndex
                                             ? deserializeIndexMetadata(in, version, metadata)
                                             : Optional.empty();
@@ -724,7 +732,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery
                  + TypeSizes.sizeof(command.nowInSec())
                  + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                  + RowFilter.serializer.serializedSize(command.rowFilter(), version)
-                 + DataLimits.serializer.serializedSize(command.limits(), version)
+                 + DataLimits.serializer.serializedSize(command.limits(), version, command.metadata.comparator)
                  + command.selectionSerializedSize(version)
                  + command.indexSerializedSize(version);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index ba7b893..39b0662 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -63,11 +63,6 @@ public interface ReadQuery
             return QueryPager.EMPTY;
         }
 
-        public QueryPager getLocalPager()
-        {
-            return QueryPager.EMPTY;
-        }
-
         public boolean selectsKey(DecoratedKey key)
         {
             return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index d96648f..c6cbdb4 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -329,6 +329,20 @@ public class SinglePartitionReadCommand extends ReadCommand
                       lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
     }
 
+    public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
+    {
+        return new SinglePartitionReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              newLimits,
+                                              partitionKey,
+                                              clusteringIndexFilter);
+    }
+
     public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
     {
         return StorageProxy.read(Group.one(this), consistency, clientState);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/aggregation/AggregationSpecification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/aggregation/AggregationSpecification.java b/src/java/org/apache/cassandra/db/aggregation/AggregationSpecification.java
new file mode 100644
index 0000000..7324dfd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/aggregation/AggregationSpecification.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.aggregation;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Defines how rows should be grouped for creating aggregates.
+ */
+public abstract class AggregationSpecification
+{
+    public static final Serializer serializer = new Serializer();
+
+    /**
+     * <code>AggregationSpecification</code> that group all the row together.
+     */
+    public static final AggregationSpecification AGGREGATE_EVERYTHING = new AggregationSpecification(Kind.AGGREGATE_EVERYTHING)
+    {
+        @Override
+        public GroupMaker newGroupMaker(GroupingState state)
+        {
+            return GroupMaker.GROUP_EVERYTHING;
+        }
+    };
+
+    /**
+     * The <code>AggregationSpecification</code> kind.
+     */
+    private final Kind kind;
+
+    /**
+     * The <code>AggregationSpecification</code> kinds.
+     */
+    public static enum Kind
+    {
+        AGGREGATE_EVERYTHING, AGGREGATE_BY_PK_PREFIX
+    }
+
+    /**
+     * Returns the <code>AggregationSpecification</code> kind.
+     * @return the <code>AggregationSpecification</code> kind
+     */
+    public Kind kind()
+    {
+        return kind;
+    }
+
+    private AggregationSpecification(Kind kind)
+    {
+        this.kind = kind;
+    }
+
+    /**
+     * Creates a new <code>GroupMaker</code> instance.
+     *
+     * @return a new <code>GroupMaker</code> instance
+     */
+    public final GroupMaker newGroupMaker()
+    {
+        return newGroupMaker(GroupingState.EMPTY_STATE);
+    }
+
+    /**
+     * Creates a new <code>GroupMaker</code> instance.
+     *
+     * @param state <code>GroupMaker</code> state
+     * @return a new <code>GroupMaker</code> instance
+     */
+    public abstract GroupMaker newGroupMaker(GroupingState state);
+
+    /**
+     * Creates a new <code>AggregationSpecification</code> instance that will build aggregates based on primary key
+     * columns.
+     *
+     * @param comparator the comparator used to compare the clustering prefixes
+     * @param clusteringPrefixSize the number of clustering columns used to create the aggregates
+     * @return a new <code>AggregationSpecification</code> instance that will build aggregates based on primary key
+     * columns
+     */
+    public static AggregationSpecification aggregatePkPrefix(ClusteringComparator comparator, int clusteringPrefixSize)
+    {
+        return new AggregateByPkPrefix(comparator, clusteringPrefixSize);
+    }
+
+    /**
+     * <code>AggregationSpecification</code> that build aggregates based on primary key columns
+     */
+    private static final class AggregateByPkPrefix extends AggregationSpecification
+    {
+        /**
+         * The number of clustering component to compare.
+         */
+        private final int clusteringPrefixSize;
+
+        /**
+         * The comparator used to compare the clustering prefixes.
+         */
+        private final ClusteringComparator comparator;
+
+        public AggregateByPkPrefix(ClusteringComparator comparator, int clusteringPrefixSize)
+        {
+            super(Kind.AGGREGATE_BY_PK_PREFIX);
+            this.comparator = comparator;
+            this.clusteringPrefixSize = clusteringPrefixSize;
+        }
+
+        @Override
+        public GroupMaker newGroupMaker(GroupingState state)
+        {
+            return GroupMaker.newInstance(comparator, clusteringPrefixSize, state);
+        }
+    }
+
+    public static class Serializer
+    {
+        public void serialize(AggregationSpecification aggregationSpec, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeByte(aggregationSpec.kind().ordinal());
+            switch (aggregationSpec.kind())
+            {
+                case AGGREGATE_EVERYTHING:
+                    break;
+                case AGGREGATE_BY_PK_PREFIX:
+                    out.writeUnsignedVInt(((AggregateByPkPrefix) aggregationSpec).clusteringPrefixSize);
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        public AggregationSpecification deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException
+        {
+            Kind kind = Kind.values()[in.readUnsignedByte()];
+            switch (kind)
+            {
+                case AGGREGATE_EVERYTHING:
+                    return AggregationSpecification.AGGREGATE_EVERYTHING;
+                case AGGREGATE_BY_PK_PREFIX:
+                    int clusteringPrefixSize = (int) in.readUnsignedVInt();
+                    return AggregationSpecification.aggregatePkPrefix(comparator, clusteringPrefixSize);
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        public long serializedSize(AggregationSpecification aggregationSpec, int version)
+        {
+            long size = TypeSizes.sizeof((byte) aggregationSpec.kind().ordinal());
+            switch (aggregationSpec.kind())
+            {
+                case AGGREGATE_EVERYTHING:
+                    break;
+                case AGGREGATE_BY_PK_PREFIX:
+                    size += TypeSizes.sizeofUnsignedVInt(((AggregateByPkPrefix) aggregationSpec).clusteringPrefixSize);
+                    break;
+                default:
+                    throw new AssertionError();
+            }
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/aggregation/GroupMaker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/aggregation/GroupMaker.java b/src/java/org/apache/cassandra/db/aggregation/GroupMaker.java
new file mode 100644
index 0000000..f6ad433
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/aggregation/GroupMaker.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.aggregation;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * A <code>GroupMaker</code> can be used to determine if some sorted rows belongs to the same group or not.
+ */
+public abstract class GroupMaker
+{
+    /**
+     * <code>GroupMaker</code> that groups all the rows together.
+     */
+    public static final GroupMaker GROUP_EVERYTHING = new GroupMaker()
+    {
+        public boolean isNewGroup(DecoratedKey partitionKey, Clustering clustering)
+        {
+            return false;
+        }
+
+        public boolean returnAtLeastOneRow()
+        {
+            return true;
+        }
+    };
+
+    public static GroupMaker newInstance(ClusteringComparator comparator, int clusteringPrefixSize, GroupingState state)
+    {
+        return new PkPrefixGroupMaker(comparator, clusteringPrefixSize, state);
+    }
+
+    public static GroupMaker newInstance(ClusteringComparator comparator, int clusteringPrefixSize)
+    {
+        return new PkPrefixGroupMaker(comparator, clusteringPrefixSize);
+    }
+
+    /**
+     * Checks if a given row belongs to the same group that the previous row or not.
+     *
+     * @param partitionKey the partition key.
+     * @param clustering the row clustering key
+     * @return <code>true</code> if the row belongs to the same group that the previous one, <code>false</code>
+     * otherwise.
+     */
+    public abstract boolean isNewGroup(DecoratedKey partitionKey, Clustering clustering);
+
+    /**
+     * Specify if at least one row must be returned. If the selection is performing some aggregations on all the rows,
+     * one row should be returned even if no records were processed.
+     *
+     * @return <code>true</code> if at least one row must be returned, <code>false</code> otherwise.
+     */
+    public boolean returnAtLeastOneRow()
+    {
+        return false;
+    }
+
+    private static final class PkPrefixGroupMaker extends GroupMaker
+    {
+        /**
+         * The size of the clustering prefix used to make the groups
+         */
+        private final int clusteringPrefixSize;
+
+        /**
+         * The comparator used to compare the clustering prefixes.
+         */
+        private final ClusteringComparator comparator;
+
+        /**
+         * The last partition key seen
+         */
+        private ByteBuffer lastPartitionKey;
+
+        /**
+         * The last clustering seen
+         */
+        private Clustering lastClustering;
+
+        public PkPrefixGroupMaker(ClusteringComparator comparator, int clusteringPrefixSize, GroupingState state)
+        {
+            this(comparator, clusteringPrefixSize);
+            this.lastPartitionKey = state.partitionKey();
+            this.lastClustering = state.clustering;
+        }
+
+        public PkPrefixGroupMaker(ClusteringComparator comparator, int clusteringPrefixSize)
+        {
+            this.comparator = comparator;
+            this.clusteringPrefixSize = clusteringPrefixSize;
+        }
+
+        @Override
+        public boolean isNewGroup(DecoratedKey partitionKey, Clustering clustering)
+        {
+            boolean isNew = false;
+
+            // We are entering a new group if:
+            // - the partition key is a new one
+            // - the last clustering was not null and does not have the same prefix as the new clustering one
+            if (!partitionKey.getKey().equals(lastPartitionKey))
+            {
+                lastPartitionKey = partitionKey.getKey();
+                isNew = true;
+                if (Clustering.STATIC_CLUSTERING == clustering)
+                {
+                    lastClustering = null;
+                    return true;
+                }
+            }
+            else if (lastClustering != null && comparator.compare(lastClustering, clustering, clusteringPrefixSize) != 0)
+            {
+                isNew = true;
+            }
+
+            lastClustering = clustering;
+            return isNew;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4205011c/src/java/org/apache/cassandra/db/aggregation/GroupingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/aggregation/GroupingState.java b/src/java/org/apache/cassandra/db/aggregation/GroupingState.java
new file mode 100644
index 0000000..ff529a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/aggregation/GroupingState.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.aggregation;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringComparator;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * {@code GroupMaker} state.
+ *
+ * <p>The state contains the primary key of the last row that has been processed by the previous
+ * {@code GroupMaker} used. It can be passed to a {@code GroupMaker} to allow to resuming the grouping.
+ * </p>
+ * <p>
+ * {@code GroupingState} is only used for internal paging. When a new page is requested by a client the initial state
+ * will always be empty.<br/>
+ * If the state has a partition key but no clustering, it means that the previous group ended at the end of the
+ * previous partition. If the clustering is not null it means that we are in the middle of a group.
+ * </p>
+ */
+public final class GroupingState
+{
+    public static final GroupingState.Serializer serializer = new Serializer();
+
+    public static final GroupingState EMPTY_STATE = new GroupingState(null, null);
+
+    /**
+     * The last row partition key.
+     */
+    private final ByteBuffer partitionKey;
+
+    /**
+     * The last row clustering
+     */
+    final Clustering clustering;
+
+    public GroupingState(ByteBuffer partitionKey, Clustering clustering)
+    {
+        this.partitionKey = partitionKey;
+        this.clustering = clustering;
+    }
+
+    /**
+     * Returns the last row partition key or <code>null</code> if no rows has been processed yet.
+     * @return the last row partition key or <code>null</code> if no rows has been processed yet
+     */
+    public ByteBuffer partitionKey()
+    {
+        return partitionKey;
+    }
+
+    /**
+     * Returns the last row clustering or <code>null</code> if either no rows has been processed yet or the last
+     * row was a static row.
+     * @return he last row clustering or <code>null</code> if either no rows has been processed yet or the last
+     * row was a static row
+     */
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    /**
+     * Checks if the state contains a Clustering for the last row that has been processed.
+     * @return <code>true</code> if the state contains a Clustering for the last row that has been processed,
+     * <code>false</code> otherwise.
+     */
+    public boolean hasClustering()
+    {
+        return clustering != null;
+    }
+
+    public static class Serializer
+    {
+        public void serialize(GroupingState state, DataOutputPlus out, int version, ClusteringComparator comparator) throws IOException
+        {
+            boolean hasPartitionKey = state.partitionKey != null;
+            out.writeBoolean(hasPartitionKey);
+            if (hasPartitionKey)
+            {
+                ByteBufferUtil.writeWithVIntLength(state.partitionKey, out);
+                boolean hasClustering = state.hasClustering();
+                out.writeBoolean(hasClustering);
+                if (hasClustering)
+                    Clustering.serializer.serialize(state.clustering, out, version, comparator.subtypes());
+            }
+        }
+
+        public GroupingState deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException
+        {
+            if (!in.readBoolean())
+                return GroupingState.EMPTY_STATE;
+
+            ByteBuffer partitionKey = ByteBufferUtil.readWithVIntLength(in);
+            Clustering clustering = null;
+            if (in.readBoolean())
+                clustering = Clustering.serializer.deserialize(in, version, comparator.subtypes());
+
+            return new GroupingState(partitionKey, clustering);
+        }
+
+        public long serializedSize(GroupingState state, int version, ClusteringComparator comparator)
+        {
+            boolean hasPartitionKey = state.partitionKey != null;
+            long size = TypeSizes.sizeof(hasPartitionKey);
+            if (hasPartitionKey)
+            {
+                size += ByteBufferUtil.serializedSizeWithVIntLength(state.partitionKey);
+                boolean hasClustering = state.hasClustering();
+                size += TypeSizes.sizeof(hasClustering);
+                if (hasClustering)
+                {
+                    size += Clustering.serializer.serializedSize(state.clustering, version, comparator.subtypes());
+                }
+            }
+            return size;
+        }
+    }
+}


Mime
View raw message