ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/22] ignite git commit: IGNITE-3448 Support SQL queries with distinct aggregates added. This closes #3448.
Date Mon, 31 Oct 2016 13:48:16 GMT
IGNITE-3448 Support SQL queries with distinct aggregates added. This closes #3448.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7ed2bb7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7ed2bb7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7ed2bb7e

Branch: refs/heads/ignite-4154
Commit: 7ed2bb7e341701d052220a36a2b2f8f0a46fd644
Parents: f37fbca
Author: AMRepo <andrey.mashenkov@gmail.com>
Authored: Wed Oct 19 18:33:59 2016 +0300
Committer: AMRepo <andrey.mashenkov@gmail.com>
Committed: Wed Oct 19 21:20:28 2016 +0300

----------------------------------------------------------------------
 .../query/h2/sql/GridSqlQuerySplitter.java      | 146 +++++---
 .../query/IgniteSqlSplitterSelfTest.java        | 339 +++++++++++++++++++
 2 files changed, 447 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7ed2bb7e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 4d34ce8..62c54bd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -33,6 +33,8 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.AVG;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.CAST;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.COUNT;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.MAX;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.MIN;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.SUM;
 import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlPlaceholder.EMPTY;
 
@@ -140,7 +142,8 @@ public class GridSqlQuerySplitter {
      * @param igniteH2Indexing Indexing implementation.
      * @return Two step query.
      */
-    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params,
boolean collocated, IgniteH2Indexing igniteH2Indexing) {
+    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, Object[] params,
boolean collocated,
+        IgniteH2Indexing igniteH2Indexing) {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -169,10 +172,18 @@ public class GridSqlQuerySplitter {
 
         Set<String> colNames = new HashSet<>();
 
-        boolean aggregateFound = false;
+        boolean distinctAggregateFound = false;
+
+        if (!collocated) {
+            for (int i = 0, len = mapExps.size(); i < len; i++)
+                distinctAggregateFound |= hasDistinctAggregates(mapExps.get(i));
+        }
+
+        boolean aggregateFound = distinctAggregateFound;
 
         for (int i = 0, len = mapExps.size(); i < len; i++) // Remember len because mapExps
list can grow.
-            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocated,
i == havingCol);
+            aggregateFound |= splitSelectExpression(mapExps, rdcExps, colNames, i, collocated,
i == havingCol,
+                distinctAggregateFound);
 
         // Fill select expressions.
         mapQry.clearColumns();
@@ -190,9 +201,14 @@ public class GridSqlQuerySplitter {
             rdcQry.addColumn(column(((GridSqlAlias)mapExps.get(i)).alias()), false);
 
         // -- GROUP BY
-        if (mapQry.groupColumns() != null && !collocated)
+        if (mapQry.groupColumns() != null && !collocated) {
             rdcQry.groupColumns(mapQry.groupColumns());
 
+            // Grouping with distinct aggregates cannot be performed on map phase
+            if (distinctAggregateFound)
+                mapQry.groupColumns(null);
+        }
+
         // -- HAVING
         if (havingCol >= 0 && !collocated) {
             // TODO IGNITE-1140 - Find aggregate functions in HAVING clause or rewrite query
to put all aggregates to SELECT clause.
@@ -477,10 +493,11 @@ public class GridSqlQuerySplitter {
      * @param idx Index.
      * @param collocated If it is a collocated query.
      * @param isHaving If it is a HAVING expression.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @return {@code true} If aggregate was found.
      */
     private static boolean splitSelectExpression(List<GridSqlElement> mapSelect, List<GridSqlElement>
rdcSelect,
-        Set<String> colNames, final int idx, boolean collocated, boolean isHaving)
{
+        Set<String> colNames, final int idx, boolean collocated, boolean isHaving,
boolean hasDistinctAggregate) {
         GridSqlElement el = mapSelect.get(idx);
 
         GridSqlAlias alias = null;
@@ -499,7 +516,7 @@ public class GridSqlQuerySplitter {
                 alias = alias(isHaving ? HAVING_COLUMN : columnName(idx), el);
 
             // We can update original alias here as well since it will be dropped from mapSelect.
-            splitAggregates(alias, 0, mapSelect, idx, true);
+            splitAggregates(alias, 0, mapSelect, idx, hasDistinctAggregate, true);
 
             set(rdcSelect, idx, alias);
         }
@@ -554,10 +571,33 @@ public class GridSqlQuerySplitter {
     }
 
     /**
+     * Lookup for distinct aggregates.
+     * Note, DISTINCT make no sense for MIN and MAX aggregates, so its will be ignored.
+     *
+     * @param el Expression.
+     * @return {@code true} If expression contains distinct aggregates.
+     */
+    private static boolean hasDistinctAggregates(GridSqlElement el) {
+        if (el instanceof GridSqlAggregateFunction) {
+            GridSqlFunctionType type = ((GridSqlAggregateFunction)el).type();
+
+            return ((GridSqlAggregateFunction)el).distinct() && type != MIN &&
type != MAX;
+        }
+
+        for (GridSqlElement child : el) {
+            if (hasDistinctAggregates(child))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param parentExpr Parent expression.
      * @param childIdx Child index to try to split.
      * @param mapSelect List of expressions in map SELECT clause.
      * @param exprIdx Index of the original expression in map SELECT clause.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @param first If the first aggregate is already found in this expression.
      * @return {@code true} If the first aggregate is already found.
      */
@@ -566,17 +606,18 @@ public class GridSqlQuerySplitter {
         final int childIdx,
         final List<GridSqlElement> mapSelect,
         final int exprIdx,
+        boolean hasDistinctAggregate,
         boolean first) {
         GridSqlElement el = parentExpr.child(childIdx);
 
         if (el instanceof GridSqlAggregateFunction) {
-            splitAggregate(parentExpr, childIdx, mapSelect, exprIdx, first);
+            splitAggregate(parentExpr, childIdx, mapSelect, exprIdx, hasDistinctAggregate,
first);
 
             return true;
         }
 
         for (int i = 0; i < el.size(); i++) {
-            if (splitAggregates(el, i, mapSelect, exprIdx, first))
+            if (splitAggregates(el, i, mapSelect, exprIdx, hasDistinctAggregate, first))
                 first = false;
         }
 
@@ -588,6 +629,7 @@ public class GridSqlQuerySplitter {
      * @param aggIdx Index of the aggregate to split in this expression.
      * @param mapSelect List of expressions in map SELECT clause.
      * @param exprIdx Index of the original expression in map SELECT clause.
+     * @param hasDistinctAggregate If query has distinct aggregate expression.
      * @param first If this is the first aggregate found in this expression.
      */
     private static void splitAggregate(
@@ -595,6 +637,7 @@ public class GridSqlQuerySplitter {
         int aggIdx,
         List<GridSqlElement> mapSelect,
         int exprIdx,
+        boolean hasDistinctAggregate,
         boolean first
     ) {
         GridSqlAggregateFunction agg = parentExpr.child(aggIdx);
@@ -612,53 +655,80 @@ public class GridSqlQuerySplitter {
         else
             mapSelect.add(mapAggAlias);
 
+        /* Note Distinct aggregate can be performed only on reduce phase, so
+           if query contains distinct aggregate then other aggregates must be processed the
same way. */
         switch (agg.type()) {
-            case AVG: // SUM( AVG(CAST(x AS DOUBLE))*COUNT(x) )/SUM( COUNT(x) ).
-                //-- COUNT(x) map
-                GridSqlElement cntMapAgg = aggregate(agg.distinct(), COUNT)
-                    .resultType(GridSqlType.BIGINT).addChild(agg.child());
+            case AVG: // SUM( AVG(CAST(x AS DOUBLE))*COUNT(x) )/SUM( COUNT(x) )  or  AVG(CAST(
x AS DOUBLE))
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    mapAgg = agg.child();
+
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.DOUBLE)
+                        .addChild(function(CAST).resultType(GridSqlType.DOUBLE).addChild(column(mapAggAlias.alias())));
+                }
+                else {
+                    //-- COUNT(x) map
+                    GridSqlElement cntMapAgg = aggregate(agg.distinct(), COUNT)
+                        .resultType(GridSqlType.BIGINT).addChild(agg.child());
 
-                // Add generated alias to COUNT(x).
-                // Using size as index since COUNT will be added as the last select element
to the map query.
-                String cntMapAggAlias = columnName(mapSelect.size());
+                    // Add generated alias to COUNT(x).
+                    // Using size as index since COUNT will be added as the last select element
to the map query.
+                    String cntMapAggAlias = columnName(mapSelect.size());
 
-                cntMapAgg = alias(cntMapAggAlias, cntMapAgg);
+                    cntMapAgg = alias(cntMapAggAlias, cntMapAgg);
 
-                mapSelect.add(cntMapAgg);
+                    mapSelect.add(cntMapAgg);
 
-                //-- AVG(CAST(x AS DOUBLE)) map
-                mapAgg = aggregate(agg.distinct(), AVG).resultType(GridSqlType.DOUBLE).addChild(
-                    function(CAST).resultType(GridSqlType.DOUBLE).addChild(agg.child()));
+                    //-- AVG(CAST(x AS DOUBLE)) map
+                    mapAgg = aggregate(agg.distinct(), AVG).resultType(GridSqlType.DOUBLE).addChild(
+                        function(CAST).resultType(GridSqlType.DOUBLE).addChild(agg.child()));
 
-                //-- SUM( AVG(x)*COUNT(x) )/SUM( COUNT(x) ) reduce
-                GridSqlElement sumUpRdc = aggregate(false, SUM).addChild(
-                    op(GridSqlOperationType.MULTIPLY,
-                        column(mapAggAlias.alias()),
-                        column(cntMapAggAlias)));
+                    //-- SUM( AVG(x)*COUNT(x) )/SUM( COUNT(x) ) reduce
+                    GridSqlElement sumUpRdc = aggregate(false, SUM).addChild(
+                        op(GridSqlOperationType.MULTIPLY,
+                            column(mapAggAlias.alias()),
+                            column(cntMapAggAlias)));
 
-                GridSqlElement sumDownRdc = aggregate(false, SUM).addChild(column(cntMapAggAlias));
+                    GridSqlElement sumDownRdc = aggregate(false, SUM).addChild(column(cntMapAggAlias));
 
-                rdcAgg = op(GridSqlOperationType.DIVIDE, sumUpRdc, sumDownRdc);
+                    rdcAgg = op(GridSqlOperationType.DIVIDE, sumUpRdc, sumDownRdc);
+                }
 
                 break;
 
-            case SUM: // SUM( SUM(x) )
-            case MAX: // MAX( MAX(x) )
-            case MIN: // MIN( MIN(x) )
-                mapAgg = aggregate(agg.distinct(), agg.type()).resultType(agg.resultType()).addChild(agg.child());
-                rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+            case SUM: // SUM( SUM(x) ) or SUM(DISTINCT x)
+            case MAX: // MAX( MAX(x) ) or MAX(DISTINCT x)
+            case MIN: // MIN( MIN(x) ) or MIN(DISTINCT x)
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    mapAgg = agg.child();
+
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+                }
+                else {
+                    mapAgg = aggregate(agg.distinct(), agg.type()).resultType(agg.resultType()).addChild(agg.child());
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).addChild(column(mapAggAlias.alias()));
+                }
 
                 break;
 
             case COUNT_ALL: // CAST(SUM( COUNT(*) ) AS BIGINT)
-            case COUNT: // CAST(SUM( COUNT(x) ) AS BIGINT)
-                mapAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT);
+            case COUNT: // CAST(SUM( COUNT(x) ) AS BIGINT) or CAST(COUNT(DISTINCT x) AS BIGINT)
+                if (hasDistinctAggregate) /* and has no collocated group by */ {
+                    assert agg.type() == COUNT;
 
-                if (agg.type() == COUNT)
-                    mapAgg.addChild(agg.child());
+                    mapAgg = agg.child();
 
-                rdcAgg = aggregate(false, SUM).addChild(column(mapAggAlias.alias()));
-                rdcAgg = function(CAST).resultType(GridSqlType.BIGINT).addChild(rdcAgg);
+                    rdcAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT)
+                        .addChild(column(mapAggAlias.alias()));
+                }
+                else {
+                    mapAgg = aggregate(agg.distinct(), agg.type()).resultType(GridSqlType.BIGINT);
+
+                    if (agg.type() == COUNT)
+                        mapAgg.addChild(agg.child());
+
+                    rdcAgg = aggregate(false, SUM).addChild(column(mapAggAlias.alias()));
+                    rdcAgg = function(CAST).resultType(GridSqlType.BIGINT).addChild(rdcAgg);
+                }
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7ed2bb7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 64be936..56658df 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -25,11 +25,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.GridRandom;
@@ -98,6 +102,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
             Integer.class, Integer.class));
 
         try {
+            awaitPartitionMapExchange();
+
             List<Integer> res = new ArrayList<>();
 
             Random rnd = new GridRandom();
@@ -340,6 +346,328 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest
{
         }
     }
 
+    /** @throws Exception if failed. */
+    public void testDistributedAggregates() throws Exception {
+        final String cacheName = "ints";
+
+        IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName,
true,
+            Integer.class, Value.class));
+
+        AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
+        AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
+        AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
+
+        try {
+            awaitPartitionMapExchange();
+
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node1KeyGen.next(), new Value(1, 3));
+            cache.put(node2KeyGen.next(), new Value(1, 3));
+
+            cache.put(node0KeyGen.next(), new Value(2, 1));
+            cache.put(node1KeyGen.next(), new Value(2, 2));
+            cache.put(node2KeyGen.next(), new Value(2, 3));
+
+            cache.put(node0KeyGen.next(), new Value(3, 1));
+            cache.put(node0KeyGen.next(), new Value(3, 1));
+            cache.put(node0KeyGen.next(), new Value(3, 2));
+            cache.put(node1KeyGen.next(), new Value(3, 1));
+            cache.put(node1KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+
+            cache.put(node0KeyGen.next(), new Value(4, 2));
+            cache.put(node1KeyGen.next(), new Value(5, 2));
+            cache.put(node2KeyGen.next(), new Value(6, 2));
+
+            checkSimpleQueryWithAggr(cache);
+            checkSimpleQueryWithDistinctAggr(cache);
+
+            checkQueryWithGroupsAndAggrs(cache);
+            checkQueryWithGroupsAndDistinctAggr(cache);
+
+            checkSimpleQueryWithAggrMixed(cache);
+            checkQueryWithGroupsAndAggrMixed(cache);
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /** @throws Exception if failed. */
+    public void testCollocatedAggregates() throws Exception {
+        final String cacheName = "ints";
+
+        IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName,
true,
+            Integer.class, Value.class));
+
+        AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
+        AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
+        AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
+
+        try {
+            awaitPartitionMapExchange();
+
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+            cache.put(node0KeyGen.next(), new Value(1, 3));
+
+            cache.put(node1KeyGen.next(), new Value(2, 1));
+            cache.put(node1KeyGen.next(), new Value(2, 2));
+            cache.put(node1KeyGen.next(), new Value(2, 3));
+
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 1));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+            cache.put(node2KeyGen.next(), new Value(3, 2));
+
+            cache.put(node0KeyGen.next(), new Value(4, 2));
+            cache.put(node1KeyGen.next(), new Value(5, 2));
+            cache.put(node2KeyGen.next(), new Value(6, 2));
+
+            checkQueryWithGroupsAndAggrs(cache);
+            checkQueryWithGroupsAndDistinctAggr(cache);
+            checkQueryWithGroupsAndAggrMixed(cache);
+        }
+        finally {
+            cache.destroy();
+        }
+    }
+
+    /** Simple query with aggregates */
+    private void checkSimpleQueryWithAggr(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count", 15L, ((Number)row.get(0)).longValue());
+            assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Simple query with distinct aggregates */
+    private void checkSimpleQueryWithDistinctAggr(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count distinct", 6L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Simple query with distinct aggregates */
+    private void checkSimpleQueryWithAggrMixed(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd)," +
+                "count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd)  " +
+                "FROM Value"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(1, result.size());
+
+            List<?> row = result.get(0);
+
+            assertEquals("count", 15L, ((Number)row.get(0)).longValue());
+            assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("count distinct", 6L, ((Number)row.get(5)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(6)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(7)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(8)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(9)).intValue());
+        }
+    }
+
+    /** Query with aggregates and groups */
+    private void checkQueryWithGroupsAndAggrs(IgniteCache<Integer, Value> cache) {
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT fst, count(snd), sum(snd), avg(snd), min(snd), max(snd) FROM Value GROUP
BY fst ORDER BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("fst", 1, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 3.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+
+            row = result.get(1);
+            assertEquals("fst", 2, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+
+            row = result.get(2);
+            assertEquals("fst", 3, ((Number)row.get(0)).intValue());
+            assertEquals("count", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 1.5d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 2, ((Integer)row.get(5)).intValue());
+        }
+    }
+
+    /** Query with distinct aggregates and groups */
+    private void checkQueryWithGroupsAndDistinctAggr(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT count(distinct snd), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value GROUP BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("count distinct", 1L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 3.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 3, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+
+            row = result.get(1);
+            assertEquals("count distinct", 3L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
+
+            row = result.get(2);
+            assertEquals("count distinct", 2L, ((Number)row.get(0)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("avg distinct", 1.5d, ((Number)row.get(2)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
+            assertEquals("max distinct", 2, ((Integer)row.get(4)).intValue());
+        }
+    }
+
+    /** Query with distinct aggregates and groups */
+    private void checkQueryWithGroupsAndAggrMixed(IgniteCache<Integer, Value> cache)
{
+        try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
+            "SELECT fst, count(snd), sum(snd), avg(snd), min(snd), max(snd)," +
+                "count(distinct snd), sum(distinct snd), avg(distinct snd), min(distinct
snd), max(distinct snd) " +
+                "FROM Value GROUP BY fst"))) {
+            List<List<?>> result = qry.getAll();
+
+            assertEquals(6, result.size());
+
+            List<?> row = result.get(0);
+            assertEquals("fst", 1, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 3.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 3, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 1L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 3.0d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 3, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(10)).intValue());
+
+            row = result.get(1);
+            assertEquals("fst", 2, ((Number)row.get(0)).intValue());
+            assertEquals("count", 3L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 2.0d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 3, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 3L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 6L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 2.0d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 3, ((Integer)row.get(10)).intValue());
+
+            row = result.get(2);
+            assertEquals("fst", 3, ((Number)row.get(0)).intValue());
+            assertEquals("count", 6L, ((Number)row.get(1)).longValue());
+            assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
+            assertEquals("avg", 1.5d, ((Number)row.get(3)).doubleValue(), 0.001);
+            assertEquals("min", 1, ((Integer)row.get(4)).intValue());
+            assertEquals("max", 2, ((Integer)row.get(5)).intValue());
+            assertEquals("count distinct", 2L, ((Number)row.get(6)).longValue());
+            assertEquals("sum distinct", 3L, ((Number)row.get(7)).longValue());
+            assertEquals("avg distinct", 1.5d, ((Number)row.get(8)).doubleValue(), 0.001);
+            assertEquals("min distinct", 1, ((Integer)row.get(9)).intValue());
+            assertEquals("max distinct", 2, ((Integer)row.get(10)).intValue());
+        }
+    }
+
+    /**  */
+    private static class Value {
+        /**  */
+        @QuerySqlField
+        private final Integer fst;
+
+        /**  */
+        @QuerySqlField
+        private final Integer snd;
+
+        /** Constructor */
+        public Value(Integer fst, Integer snd) {
+            this.fst = fst;
+            this.snd = snd;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AffinityKeyGenerator {
+        /** */
+        private final Affinity<Integer> affinity;
+
+        /** */
+        private final ClusterNode node;
+
+        /** */
+        private int start = 0;
+
+        /** Constructor */
+        AffinityKeyGenerator(Ignite node, String cacheName) {
+            this.affinity = node.affinity(cacheName);
+            this.node = node.cluster().localNode();
+        }
+
+        /**  */
+        public Integer next() {
+            int key = start;
+
+            while (start < Integer.MAX_VALUE) {
+                if (affinity.isPrimary(node, key)) {
+                    start = key + 1;
+
+                    return key;
+                }
+
+                key++;
+            }
+
+            throw new IllegalStateException("Can't find next key");
+        }
+    }
+
     /**
      *
      */
@@ -391,9 +719,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest
{
      * Test value.
      */
     private static class GroupIndexTestValue implements Serializable {
+        /** */
         @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 0))
         private int a;
 
+        /** */
         @QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 1))
         private int b;
 
@@ -408,22 +738,31 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest
{
     }
 
     private static class User implements Serializable {
+        /** */
         @QuerySqlField
         private int id;
     }
 
+    /** */
     private static class UserOrder implements Serializable {
+        /** */
         @QuerySqlField
         private int id;
 
+        /** */
         @QuerySqlField
         private int userId;
     }
 
+    /**
+     *
+     */
     private static class OrderGood implements Serializable {
+        /** */
         @QuerySqlField
         private int orderId;
 
+        /** */
         @QuerySqlField
         private int goodId;
     }


Mime
View raw message