ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptupit...@apache.org
Subject [03/33] ignite git commit: IGNITE-3448 Support SQL queries with distinct aggregates added. This closes #3448.
Date Mon, 14 Nov 2016 09:26:15 GMT
IGNITE-3448 Support SQL queries with distinct aggregates added. This closes #3448.

(cherry picked from commit 7ed2bb7)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e724fd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e724fd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e724fd8

Branch: refs/heads/master
Commit: 3e724fd8a8da38c955e7ab4712e0cb8b5685a16c
Parents: e0b8467
Author: AMRepo <andrey.mashenkov@gmail.com>
Authored: Wed Oct 19 18:33:59 2016 +0300
Committer: Andrey V. Mashenkov <andrey.mashenkov@gmail.com>
Committed: Wed Oct 26 17:21:23 2016 +0300

----------------------------------------------------------------------
 .../query/h2/sql/GridSqlQuerySplitter.java      | 143 +++++---
 .../query/IgniteSqlSplitterSelfTest.java        | 326 +++++++++++++++++++
 2 files changed, 432 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3e724fd8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7205a18..7d43bf6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -39,6 +39,8 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2Collocati
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.COUNT;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.MAX;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.MIN;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.SUM;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlaceholder.EMPTY;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser.prepared;
@@ -298,10 +300,18 @@ public class GridSqlQuerySplitter {
 
         Set<String> colNames = new HashSet<>();
 
-        boolean aggregateFound = false;
+        boolean distinctAggregateFound = false;
+
+        if (!collocatedGroupBy) {
+            for (int i = 0, len = mapExps.size(); i < len; i++)
+                distinctAggregateFound |= hasDistinctAggregates(mapExps.get(i));
+        }
+
+        boolean aggregateFound = distinctAggregateFound;
 
         for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len because mapExps
list can grow.
-            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocatedGroupBy,
i == havingCol);
+            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocatedGroupBy,
i == havingCol,
+                distinctAggregateFound);
 
         // -- SELECT
         mapQry.clearColumns();
@@ -325,9 +335,14 @@ public class GridSqlQuerySplitter {
         findAffinityColumnConditions(mapQry.where());
 
         // -- GROUP BY
-        if (mapQry.groupColumns() != null && !collocatedGroupBy)
+        if (mapQry.groupColumns() != null && !collocatedGroupBy) {
             rdcQry.groupColumns(mapQry.groupColumns());
 
+            // Grouping with distinct aggregates cannot be performed on map phase
+            if (distinctAggregateFound)
+                mapQry.groupColumns(null);
+        }
+
         // -- HAVING
         if (havingCol >= 0 && !collocatedGroupBy) {
             // TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query
to put all aggregates to SELECT clause.
@@ -638,10 +653,11 @@ public class GridSqlQuerySplitter {
      * @param idx Index.
      * @param collocated If it is a collocated query.
      * @param isHaving If it is a HAVING expression.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @return {@code true} If aggregate was found.
      */
     private static boolean splitSelectExpression(List<GridSqlElement> mapSelect, List<GridSqlElement>
rdcSelect,
-        Set<String> colNames, final int idx, boolean collocated, boolean isHaving)
{
+        Set<String> colNames, final int idx, boolean collocated, boolean isHaving,
boolean hasDistinctAggregate) {
         GridSqlElement el = mapSelect.get(idx);
 
         GridSqlAlias alias = null;
@@ -660,7 +676,7 @@ public class GridSqlQuerySplitter {
                 alias = alias(isHaving ? HAVING_COLUMN : columnName(idx), el);
 
             // We can update original alias here as well since it will be dropped from mapSelect.
-            splitAggregates(alias, 0, mapSelect, idx, true);
+            splitAggregates(alias, 0, mapSelect, idx, hasDistinctAggregate, true);
 
             set(rdcSelect, idx, alias);
         }
@@ -715,10 +731,33 @@ public class GridSqlQuerySplitter {
     }
 
     /**
+     * Lookup for distinct aggregates.
+     * Note, DISTINCT make no sense for MIN and MAX aggregates, so its will be ignored.
+     *
+     * @param el Expression.
+     * @return {@code true} If expression contains distinct aggregates.
+     */
+    private static boolean hasDistinctAggregates(GridSqlElement el) {
+        if (el instanceof GridSqlAggregateFunction) {
+            GridSqlFunctionType type = ((GridSqlAggregateFunction)el).type();
+
+            return ((GridSqlAggregateFunction)el).distinct() && type != MIN &&
type != MAX;
+        }
+
+        for (GridSqlElement child : el) {
+            if (hasDistinctAggregates(child))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param parentExpr Parent expression.
      * @param childIdx Child index to try to split.
      * @param mapSelect List of expressions in map SELECT clause.
      * @param exprIdx Index of the original expression in map SELECT clause.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @param first If the first aggregate is already found in this expression.
      * @return {@code true} If the first aggregate is already found.
      */
@@ -727,17 +766,18 @@ public class GridSqlQuerySplitter {
         final int childIdx,
         final List<GridSqlElement> mapSelect,
         final int exprIdx,
+        boolean hasDistinctAggregate,
         boolean first) {
         GridSqlElement el = parentExpr.child(childIdx);
 
         if (el instanceof GridSqlAggregateFunction) {
-            splitAggregate(parentExpr, childIdx, mapSelect, exprIdx, first);
+            splitAggregate(parentExpr, childIdx, mapSelect, exprIdx, hasDistinctAggregate,
first);
 
             return true;
         }
 
         for (int i = 0; i < el.size(); i++) {
-            if (splitAggregates(el, i, mapSelect, exprIdx, first))
+            if (splitAggregates(el, i, mapSelect, exprIdx, hasDistinctAggregate, first))
                 first = false;
         }
 
@@ -749,6 +789,7 @@ public class GridSqlQuerySplitter {
      * @param aggIdx Index of the aggregate to split in this expression.
      * @param mapSelect List of expressions in map SELECT clause.
      * @param exprIdx Index of the original expression in map SELECT clause.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @param first If this is the first aggregate found in this expression.
      */
     private static void splitAggregate(
@@ -756,6 +797,7 @@ public class GridSqlQuerySplitter {
         int aggIdx,
         List<GridSqlElement> mapSelect,
         int exprIdx,
+        boolean hasDistinctAggregate,
         boolean first
     ) {
         GridSqlAggregateFunction agg = parentExpr.child(aggIdx);
@@ -773,53 +815,80 @@ public class GridSqlQuerySplitter {
         else
             mapSelect.add(mapAggAlias);
 
+        /* Note Distinct aggregate can be performed only on reduce phase, so
+           if query contains distinct aggregate then other aggregates must be processed the
same way. */
         switch (agg.type()) {
-            case AVG: // SUM( AVG(CAST(x AS DOUBLE))*COUNT(x) )/SUM( COUNT(x) ).
-                //-- COUNT(x) map
-                GridSqlElement cntMapAgg = aggregate(agg.distinct(), COUNT)
-                    .resultType(GridSqlType.BIGINT).addChild(agg.child());
+            case AVG: // SUM( AVG(CAST(x AS DOUBLE))*COUNT(x) )/SUM( COUNT(x) )  or  AVG(CAST(
x AS DOUBLE))
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    mapAgg = agg.child();
+
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.DOUBLE)
+                        .addChild(function(CAST).resultType(GridSqlType.DOUBLE).addChild(column(mapAggAlias.alias())));
+                }
+                else {
+                    //-- COUNT(x) map
+                    GridSqlElement cntMapAgg = aggregate(agg.distinct(), COUNT)
+                        .resultType(GridSqlType.BIGINT).addChild(agg.child());
 
-                // Add generated alias to COUNT(x).
-                // Using size as index since COUNT will be added as the last select element
to the map query.
-                String cntMapAggAlias = columnName(mapSelect.size());
+                    // Add generated alias to COUNT(x).
+                    // Using size as index since COUNT will be added as the last select element
to the map query.
+                    String cntMapAggAlias = columnName(mapSelect.size());
 
-                cntMapAgg = alias(cntMapAggAlias, cntMapAgg);
+                    cntMapAgg = alias(cntMapAggAlias, cntMapAgg);
 
-                mapSelect.add(cntMapAgg);
+                    mapSelect.add(cntMapAgg);
 
-                //-- AVG(CAST(x AS DOUBLE)) map
-                mapAgg = aggregate(agg.distinct(), AVG).resultType(GridSqlType.DOUBLE).addChild(
-                    function(CAST).resultType(GridSqlType.DOUBLE).addChild(agg.child()));
+                    //-- AVG(CAST(x AS DOUBLE)) map
+                    mapAgg = aggregate(agg.distinct(), AVG).resultType(GridSqlType.DOUBLE).addChild(
+                        function(CAST).resultType(GridSqlType.DOUBLE).addChild(agg.child()));
 
-                //-- SUM( AVG(x)*COUNT(x) )/SUM( COUNT(x) ) reduce
-                GridSqlElement sumUpRdc = aggregate(false, SUM).addChild(
-                    op(GridSqlOperationType.MULTIPLY,
-                        column(mapAggAlias.alias()),
-                        column(cntMapAggAlias)));
+                    //-- SUM( AVG(x)*COUNT(x) )/SUM( COUNT(x) ) reduce
+                    GridSqlElement sumUpRdc = aggregate(false, SUM).addChild(
+                        op(GridSqlOperationType.MULTIPLY,
+                            column(mapAggAlias.alias()),
+                            column(cntMapAggAlias)));
 
-                GridSqlElement sumDownRdc = aggregate(false, SUM).addChild(column(cntMapAggAlias));
+                    GridSqlElement sumDownRdc = aggregate(false, SUM).addChild(column(cntMapAggAlias));
 
-                rdcAgg = op(GridSqlOperationType.DIVIDE, sumUpRdc, sumDownRdc);
+                    rdcAgg = op(GridSqlOperationType.DIVIDE, sumUpRdc, sumDownRdc);
+                }
 
                 break;
 
-            case SUM: // SUM( SUM(x) )
-            case MAX: // MAX( MAX(x) )
-            case MIN: // MIN( MIN(x) )
-                mapAgg = aggregate(agg.distinct(), agg.type()).resultType(agg.resultType()).addChild(agg.child());
-                rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+            case SUM: // SUM( SUM(x) ) or SUM(DISTINCT x)
+            case MAX: // MAX( MAX(x) ) or MAX(DISTINCT x)
+            case MIN: // MIN( MIN(x) ) or MIN(DISTINCT x)
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    mapAgg = agg.child();
+
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+                }
+                else {
+                    mapAgg = aggregate(agg.distinct(), agg.type()).resultType(agg.resultType()).addChild(agg.child());
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+                }
 
                 break;
 
             case COUNT_ALL: // CAST(SUM( COUNT(*) ) AS BIGINT)
-            case COUNT: // CAST(SUM( COUNT(x) ) AS BIGINT)
-                mapAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT);
+            case COUNT: // CAST(SUM( COUNT(x) ) AS BIGINT) or CAST(COUNT(DISTINCT x) AS BIGINT)
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    assert agg.type() == COUNT;
 
-                if (agg.type() == COUNT)
-                    mapAgg.addChild(agg.child());
+                    mapAgg = agg.child();
 
-                rdcAgg = aggregate(false, SUM).addChild(column(mapAggAlias.alias()));
-                rdcAgg = function(CAST).resultType(GridSqlType.BIGINT).addChild(rdcAgg);
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT)
+                        .addChild(column(mapAggAlias.alias()));
+                }
+                else {
+                    mapAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT);
+
+                    if (agg.type() == COUNT)
+                        mapAgg.addChild(agg.child());
+
+                    rdcAgg = aggregate(false, SUM).addChild(column(mapAggAlias.alias()));
+                    rdcAgg = function(CAST).resultType(GridSqlType.BIGINT).addChild(rdcAgg);
+                }
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3e724fd8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 1a2530e..50f2ef0 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -27,14 +27,18 @@ import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.GridRandom;
@@ -1059,6 +1063,328 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest
{
         }
     }
 
+    /** @throws Exception if failed. */
+    public void testDistributedAggregates() throws Exception {
+        final String cacheName = "ints";
+
+        IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName,
true,
+            Integer.class, Value.class));
+
+        AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
+        AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
+        AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
+
+        try {
+            awaitPartitionMapExchange();
+
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node1KeyGen.next(), new Value(1, 3));
+            cache.put(node2KeyGen.next(), new Value(1, 3));
+
+            cache.put(node0KeyGen.next(), new Value(2, 1));
+            cache.put(node1KeyGen.next(), new Value(2, 2));
+            cache.put(node2KeyGen.next(), new Value(2, 3));
+
+            cache.put(node0KeyGen.next(), new Value(3, 1));
+            cache.put(node0KeyGen.next(), new Value(3, 1));
+            cache.put(node0KeyGen.next(), new Value(3, 2));
+            cache.put(node1KeyGen.next(), new Value(3, 1));
+            cache.put(node1KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+
+            cache.put(node0KeyGen.next(), new Value(4, 2));
+            cache.put(node1KeyGen.next(), new Value(5, 2));
+            cache.put(node2KeyGen.next(), new Value(6, 2));
+
+            checkSimpleQueryWithAggr(cache);
+            checkSimpleQueryWithDistinctAggr(cache);
+
+            checkQueryWithGroupsAndAggrs(cache);
+            checkQueryWithGroupsAndDistinctAggr(cache);
+
+            checkSimpleQueryWithAggrMixed(cache);
+            checkQueryWithGroupsAndAggrMixed(cache);
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /** @throws Exception if failed. */
+    public void testCollocatedAggregates() throws Exception {
+        final String cacheName = "ints";
+
+        IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName,
true,
+            Integer.class, Value.class));
+
+        AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
+        AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
+        AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
+
+        try {
+            awaitPartitionMapExchange();
+
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+
+            cache.put(node1KeyGen.next(), new Value(2, 1));
+            cache.put(node1KeyGen.next(), new Value(2, 2));
+            cache.put(node1KeyGen.next(), new Value(2, 3));
+
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+
+            cache.put(node0KeyGen.next(), new Value(4, 2));
+            cache.put(node1KeyGen.next(), new Value(5, 2));
+            cache.put(node2KeyGen.next(), new Value(6, 2));
+
+            checkQueryWithGroupsAndAggrs(cache);
+            checkQueryWithGroupsAndDistinctAggr(cache);
+            checkQueryWithGroupsAndAggrMixed(cache);
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /** Simple query with aggregates */
+    private void checkSimpleQueryWithAggr(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count", 15L, ((Number)row.get(0)).longValue());
+            assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Simple query with distinct aggregates */
+    private void checkSimpleQueryWithDistinctAggr(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count distinct", 6L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Simple query with distinct aggregates */
+    private void checkSimpleQueryWithAggrMixed(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd)," +
+                "count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd)  " +
+                "FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count", 15L, ((Number)row.get(0)).longValue());
+            assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("count distinct", 6L, ((Number)row.get(5)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(6)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(7)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(8)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(9)).intValue());
+        }
+    }
+
+    /** Query with aggregates and groups */
+    private void checkQueryWithGroupsAndAggrs(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT fst, count(snd), sum(snd), avg(snd), min(snd), max(snd) FROM Value GROUP
BY fst ORDER BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("fst", 1, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 3.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+
+            row = result.get(1);
+            assertEquals("fst", 2, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+
+            row = result.get(2);
+            assertEquals("fst", 3, ((Number)row.get(0)).intValue());
+            assertEquals("count", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 1.5d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 2, ((Integer)row.get(5)).intValue());
+        }
+    }
+
+    /** Query with distinct aggregates and groups */
+    private void checkQueryWithGroupsAndDistinctAggr(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(distinct snd), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value GROUP BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("count distinct", 1L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 3.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 3, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+
+            row = result.get(1);
+            assertEquals("count distinct", 3L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+
+            row = result.get(2);
+            assertEquals("count distinct", 2L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 1.5d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 2, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Query with distinct aggregates and groups */
+    private void checkQueryWithGroupsAndAggrMixed(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT fst, count(snd), sum(snd), avg(snd), min(snd), max(snd)," +
+                "count(distinct snd), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value GROUP BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("fst", 1, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 3.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 1L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 3.0d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 3, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(10)).intValue());
+
+            row = result.get(1);
+            assertEquals("fst", 2, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 3L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(10)).intValue());
+
+            row = result.get(2);
+            assertEquals("fst", 3, ((Number)row.get(0)).intValue());
+            assertEquals("count", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 1.5d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 2, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 2L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 1.5d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 2, ((Integer)row.get(10)).intValue());
+        }
+    }
+
+    /**  */
+    private static class Value {
+        /**  */
+        @QuerySqlField
+        private final Integer fst;
+
+        /**  */
+        @QuerySqlField
+        private final Integer snd;
+
+        /** Constructor */
+        public Value(Integer fst, Integer snd) {
+            this.fst = fst;
+            this.snd = snd;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AffinityKeyGenerator {
+        /** */
+        private final Affinity<Integer> affinity;
+
+        /** */
+        private final ClusterNode node;
+
+        /** */
+        private int start = 0;
+
+        /** Constructor */
+        AffinityKeyGenerator(Ignite node, String cacheName) {
+            this.affinity = node.affinity(cacheName);
+            this.node = node.cluster().localNode();
+        }
+
+        /**  */
+        public Integer next() {
+            int key = start;
+
+            while (start < Integer.MAX_VALUE) {
+                if (affinity.isPrimary(node, key)) {
+                    start = key + 1;
+
+                    return key;
+                }
+
+                key++;
+            }
+
+            throw new IllegalStateException("Can't find next key");
+        }
+    }
+
     /**
      *
      */


Mime
View raw message