ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-4509: Send unicast SQL request when partition can be deduced from the query. This closes #1916.
Date Fri, 02 Jun 2017 11:20:44 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 79d45e343 -> fbf0e3537


IGNITE-4509: Send unicast SQL request when partition can be deduced from the query. This closes
#1916.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fbf0e353
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fbf0e353
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fbf0e353

Branch: refs/heads/master
Commit: fbf0e35371949309f07105331f3bafb54da6c29c
Parents: 79d45e3
Author: Sergey Kalashnikov <skalashnikov@gridgain.com>
Authored: Fri Jun 2 14:20:37 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Jun 2 14:20:37 2017 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheSqlQuery.java          |  23 +
 .../cache/query/CacheQueryPartitionInfo.java    | 110 ++++
 .../cache/query/GridCacheTwoStepQuery.java      |  18 +
 .../processors/query/h2/IgniteH2Indexing.java   |  50 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      | 228 +++++++-
 .../processors/query/IgniteSqlRoutingTest.java  | 552 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 7 files changed, 976 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 780e462..d3746f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -69,6 +69,11 @@ public class GridCacheSqlQuery implements Message {
     /** Single node to execute the query on. */
     private UUID node;
 
+    /** Derived partition info. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private transient Object[] derivedPartitions;
+
     /**
      * For {@link Message}.
      */
@@ -253,6 +258,7 @@ public class GridCacheSqlQuery implements Message {
         cp.paramIdxs = paramIdxs;
         cp.sort = sort;
         cp.partitioned = partitioned;
+        cp.derivedPartitions = derivedPartitions;
 
         return cp;
     }
@@ -324,4 +330,21 @@ public class GridCacheSqlQuery implements Message {
 
         return res;
     }
+
+    /**
+     * @return Derived partitions.
+     */
+    public Object[] derivedPartitions() {
+        return derivedPartitions;
+    }
+
+    /**
+     * @param derivedPartitions Derived partitions.
+     * @return {@code this}.
+     */
+    public GridCacheSqlQuery derivedPartitions(Object[] derivedPartitions) {
+        this.derivedPartitions = derivedPartitions;
+
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java
new file mode 100644
index 0000000..1329d5c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPartitionInfo.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Holds the partition calculation info extracted from a query.
+ * The query may have several such items associated with it.
+ *
+ * The query may contain expressions containing key or affinity key.
+ * Such expressions can be used as hints to derive small isolated set
+ * of partitions the query needs to run on.
+ *
+ * In case expression contains constant (e.g. _key = 100), the partition
+ * can be calculated right away and saved into cache along with the query.
+ *
+ * In case expression has a parameter (e.g. _key = ?), the effective
+ * partition varies with each run of the query. Hence, instead of partition,
+ * one must store the info required to calculate partition.
+ *
+ * The given class holds the required info, so that effective partition
+ * can be calculated during query parameter binding.
+ */
+public class CacheQueryPartitionInfo {
+    /** */
+    private int partId;
+
+    /** */
+    private String cacheName;
+
+    /** */
+    private int paramIdx;
+
+    /**
+     * @param partId Partition id, or -1 if parameter binding required.
+     * @param cacheName Cache name required for partition calculation.
+     * @param paramIdx Query parameter index required for partition calculation.
+     */
+    public CacheQueryPartitionInfo(int partId, String cacheName, int paramIdx) {
+        this.partId = partId;
+        this.cacheName = cacheName;
+        this.paramIdx = paramIdx;
+    }
+
+    /**
+     * @return Partition id, or -1 if parameter binding is required to calculate partition.
+     */
+    public int partition() {
+        return partId;
+    }
+
+    /**
+     * @return Cache name required for partition calculation.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Query parameter index required for partition calculation.
+     */
+    public int paramIdx() {
+        return paramIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return partId ^ paramIdx ^ (cacheName == null ? 0 : cacheName.hashCode());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+
+        if (!(obj instanceof CacheQueryPartitionInfo))
+            return false;
+
+        CacheQueryPartitionInfo other = (CacheQueryPartitionInfo)obj;
+
+        if (partId >= 0)
+            return partId == other.partId;
+
+        if (other.cacheName == null)
+            return false;
+
+        return other.cacheName.equals(cacheName) && other.paramIdx == paramIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheQueryPartitionInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 9e9a875..24958af 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -62,6 +62,9 @@ public class GridCacheTwoStepQuery {
     /** */
     private boolean local;
 
+    /** */
+    private CacheQueryPartitionInfo[] derivedPartitions;
+
     /**
      * @param originalSql Original query SQL.
      * @param tbls Tables in query.
@@ -210,6 +213,20 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Query derived partitions info.
+     */
+    public CacheQueryPartitionInfo[] derivedPartitions() {
+        return this.derivedPartitions;
+    }
+
+    /**
+     * @param derivedPartitions Query derived partitions info.
+     */
+    public void derivedPartitions(CacheQueryPartitionInfo[] derivedPartitions) {
+        this.derivedPartitions = derivedPartitions;
+    }
+
+    /**
      * @return Copy.
      */
     public GridCacheTwoStepQuery copy() {
@@ -222,6 +239,7 @@ public class GridCacheTwoStepQuery {
         cp.skipMergeTbl = skipMergeTbl;
         cp.pageSize = pageSize;
         cp.distributedJoins = distributedJoins;
+        cp.derivedPartitions = derivedPartitions;
 
         for (int i = 0; i < mapQrys.size(); i++)
             cp.mapQrys.add(mapQrys.get(i).copy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 50c8e41..12addbd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -1387,9 +1388,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (cancel == null)
             cancel = new GridQueryCancel();
 
+        int partitions[] = qry.getPartitions();
+
+        if (partitions == null && twoStepQry.derivedPartitions() != null) {
+            try {
+                partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), qry.getArgs());
+            } catch (IgniteCheckedException e) {
+                throw new CacheException("Failed to calculate derived partitions: [qry="
+ sqlQry + ", params=" +
+                    Arrays.deepToString(qry.getArgs()) + "]", e);
+            }
+        }
+
         QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
             runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(),
cancel,
-                qry.getArgs(), qry.getPartitions()), cancel);
+                qry.getArgs(), partitions), cancel);
 
         cursor.fieldsMeta(meta);
 
@@ -2245,6 +2257,42 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         rdcQryExec.onDisconnected(reconnectFut);
     }
 
+    /**
+     * Bind query parameters and calculate partitions derived from the query.
+     *
+     * @return Partitions.
+     */
+    private int[] calculateQueryPartitions(CacheQueryPartitionInfo[] partInfoList, Object[]
params)
+        throws IgniteCheckedException {
+
+        ArrayList<Integer> list = new ArrayList<>(partInfoList.length);
+
+        for (CacheQueryPartitionInfo partInfo: partInfoList) {
+            int partId = partInfo.partition() < 0 ?
+                kernalContext().affinity().partition(partInfo.cacheName(), params[partInfo.paramIdx()])
:
+                partInfo.partition();
+
+            int i = 0;
+
+            while (i < list.size() && list.get(i) < partId)
+                i++;
+
+            if (i < list.size()) {
+                if (list.get(i) > partId)
+                    list.add(i, partId);
+            }
+            else
+                list.add(partId);
+        }
+
+        int[] result = new int[list.size()];
+
+        for (int i = 0; i < list.size(); i++)
+            result[i] = list.get(i);
+
+        return result;
+    }
+
     /** {@inheritDoc} */
     @Override public Collection<GridRunningQueryInfo> runningQueries(long duration)
{
         Collection<GridRunningQueryInfo> res = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index e2b82ab..1b9619e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -34,11 +34,15 @@ import java.util.TreeSet;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -46,6 +50,7 @@ import org.h2.command.Prepared;
 import org.h2.command.dml.Query;
 import org.h2.command.dml.SelectUnion;
 import org.h2.jdbc.JdbcPreparedStatement;
+import org.h2.table.IndexColumn;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
@@ -116,13 +121,17 @@ public class GridSqlQuerySplitter {
     /** */
     private IdentityHashMap<GridSqlAst, GridSqlAlias> uniqueFromAliases = new IdentityHashMap<>();
 
+    /** */
+    private GridKernalContext ctx;
+
     /**
      * @param params Query parameters.
      * @param collocatedGrpBy If it is a collocated GROUP BY query.
      */
-    public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy) {
+    public GridSqlQuerySplitter(Object[] params, boolean collocatedGrpBy, GridKernalContext
ctx) {
         this.params = params;
         this.collocatedGrpBy = collocatedGrpBy;
+        this.ctx = ctx;
     }
 
     /**
@@ -182,7 +191,7 @@ public class GridSqlQuerySplitter {
 
         qry.explain(false);
 
-        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy);
+        GridSqlQuerySplitter splitter = new GridSqlQuerySplitter(params, collocatedGrpBy,
h2.kernalContext());
 
         // Normalization will generate unique aliases for all the table filters in FROM.
         // Also it will collect all tables and schemas from the query.
@@ -234,13 +243,16 @@ public class GridSqlQuerySplitter {
         twoStepQry.explain(explain);
         twoStepQry.distributedJoins(distributedJoins);
 
+        // all map queries must have non-empty derivedPartitions to use this feature.
+        twoStepQry.derivedPartitions(mergePartitionsFromMultipleQueries(twoStepQry.mapQueries()));
+
         return twoStepQry;
     }
 
     /**
      * @param qry Optimized and normalized query to split.
      */
-    private void splitQuery(GridSqlQuery qry) {
+    private void splitQuery(GridSqlQuery qry) throws IgniteCheckedException {
         // Create a fake parent AST element for the query to allow replacing the query in
the parent by split.
         GridSqlSubquery fakeQryPrnt = new GridSqlSubquery(qry);
 
@@ -1016,7 +1028,7 @@ public class GridSqlQuerySplitter {
     /**
      * @param qrym Query model.
      */
-    private void splitQueryModel(QueryModel qrym) {
+    private void splitQueryModel(QueryModel qrym) throws IgniteCheckedException {
         switch (qrym.type) {
             case SELECT:
                 if (qrym.needSplit) {
@@ -1191,7 +1203,7 @@ public class GridSqlQuerySplitter {
     private void splitSelect(
         final GridSqlAst prnt,
         final int childIdx
-    ) {
+    ) throws IgniteCheckedException {
         if (++splitId > 99)
             throw new CacheException("Too complex query to process.");
 
@@ -1317,6 +1329,9 @@ public class GridSqlQuerySplitter {
         map.sortColumns(mapQry.sort());
         map.partitioned(hasPartitionedTables(mapQry));
 
+        if (map.isPartitioned())
+            map.derivedPartitions(derivePartitionsFromQuery(mapQry, ctx));
+
         mapSqlQrys.add(map);
     }
 
@@ -1985,8 +2000,209 @@ public class GridSqlQuerySplitter {
      * @return true if given type is fractional
      */
     private static boolean isFractionalType(int type) {
-       return type == Value.DECIMAL || type == Value.FLOAT || type == Value.DOUBLE;
+        return type == Value.DECIMAL || type == Value.FLOAT || type == Value.DOUBLE;
+    }
+
+    /**
+     * Checks if given query contains expressions over key or affinity key
+     * that make it possible to run it only on a small isolated
+     * set of partitions.
+     *
+     * @param qry Query.
+     * @param ctx Kernal context.
+     * @return Array of partitions, or {@code null} if none identified
+     */
+    private static CacheQueryPartitionInfo[] derivePartitionsFromQuery(GridSqlQuery qry,
GridKernalContext ctx)
+        throws IgniteCheckedException {
+
+        if (!(qry instanceof GridSqlSelect))
+            return null;
+
+        GridSqlSelect select = (GridSqlSelect)qry;
+
+        // no joins support yet
+        if (select.from() == null || select.from().size() != 1)
+            return null;
+
+        return extractPartition(select.where(), ctx);
+    }
+
+    /**
+     * @param el AST element to start with.
+     * @param ctx Kernal context.
+     * @return Array of partition info objects, or {@code null} if none identified
+     */
+    private static CacheQueryPartitionInfo[] extractPartition(GridSqlAst el, GridKernalContext
ctx)
+        throws IgniteCheckedException {
+
+        if (!(el instanceof GridSqlOperation))
+            return null;
+
+        GridSqlOperation op = (GridSqlOperation)el;
+
+        switch (op.operationType()) {
+            case EQUAL: {
+                CacheQueryPartitionInfo partInfo = extractPartitionFromEquality(op, ctx);
+
+                if (partInfo != null)
+                    return new CacheQueryPartitionInfo[] { partInfo };
+
+                return null;
+            }
+
+            case AND: {
+                assert op.size() == 2;
+
+                CacheQueryPartitionInfo[] partsLeft = extractPartition(op.child(0), ctx);
+                CacheQueryPartitionInfo[] partsRight = extractPartition(op.child(1), ctx);
+
+                if (partsLeft != null && partsRight != null)
+                    return null; //kind of conflict (_key = 1) and (_key = 2)
+
+                if (partsLeft != null)
+                    return partsLeft;
+
+                if (partsRight != null)
+                    return partsRight;
+
+                return null;
+            }
+
+            case OR: {
+                assert op.size() == 2;
+
+                CacheQueryPartitionInfo[] partsLeft = extractPartition(op.child(0), ctx);
+                CacheQueryPartitionInfo[] partsRight = extractPartition(op.child(1), ctx);
+
+                if (partsLeft != null && partsRight != null)
+                    return mergePartitionInfo(partsLeft, partsRight);
+
+                return null;
+            }
+
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Analyses the equality operation and extracts the partition if possible
+     *
+     * @param op AST equality operation.
+     * @param ctx Kernal Context.
+     * @return partition info, or {@code null} if none identified
+     */
+    private static CacheQueryPartitionInfo extractPartitionFromEquality(GridSqlOperation
op, GridKernalContext ctx)
+        throws IgniteCheckedException {
+
+        assert op.operationType() == GridSqlOperationType.EQUAL;
+
+        GridSqlElement left = op.child(0);
+        GridSqlElement right = op.child(1);
+
+        if (!(left instanceof GridSqlColumn))
+            return null;
+
+        if (!(right instanceof GridSqlConst) && !(right instanceof GridSqlParameter))
+            return null;
+
+        GridSqlColumn column = (GridSqlColumn)left;
+
+        assert column.column().getTable() instanceof GridH2Table;
+
+        GridH2Table tbl = (GridH2Table) column.column().getTable();
+
+        GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        IndexColumn affKeyCol = tbl.getAffinityKeyColumn();
+
+        int colId = column.column().getColumnId();
+
+        if ((affKeyCol == null || colId != affKeyCol.column.getColumnId()) && !desc.isKeyColumn(colId))
+            return null;
+
+        if (right instanceof GridSqlConst) {
+            GridSqlConst constant = (GridSqlConst)right;
+
+            return new CacheQueryPartitionInfo(ctx.affinity().partition(tbl.cacheName(),
+                constant.value().getObject()), null, -1);
+        }
+
+        assert right instanceof GridSqlParameter;
+
+        GridSqlParameter param = (GridSqlParameter) right;
+
+        return new CacheQueryPartitionInfo(-1, tbl.cacheName(), param.index());
     }
+
+    /**
+     * Merges two partition info arrays, removing duplicates
+     *
+     * @param a Partition info array.
+     * @param b Partition info array.
+     * @return Result.
+     */
+    private static CacheQueryPartitionInfo[] mergePartitionInfo(CacheQueryPartitionInfo[]
a, CacheQueryPartitionInfo[] b) {
+        assert a != null;
+        assert b != null;
+
+        if (a.length == 1 && b.length == 1) {
+            if (a[0].equals(b[0]))
+                return new CacheQueryPartitionInfo[] { a[0] };
+
+            return new CacheQueryPartitionInfo[] { a[0], b[0] };
+        }
+
+        ArrayList<CacheQueryPartitionInfo> list = new ArrayList<>(a.length +
b.length);
+
+        for (CacheQueryPartitionInfo part: a)
+            list.add(part);
+
+        for (CacheQueryPartitionInfo part: b) {
+            int i = 0;
+
+            while (i < list.size() && !list.get(i).equals(part))
+                i++;
+
+            if (i == list.size())
+                list.add(part);
+        }
+
+        CacheQueryPartitionInfo[] result = new CacheQueryPartitionInfo[list.size()];
+
+        for (int i = 0; i < list.size(); i++)
+            result[i] = list.get(i);
+
+        return result;
+    }
+
+    /**
+     * Ensures all given queries have non-empty derived partitions and merges them.
+     *
+     * @param queries Collection of queries.
+     * @return Derived partitions for all queries, or {@code null}.
+     */
+    private static CacheQueryPartitionInfo[] mergePartitionsFromMultipleQueries(List<GridCacheSqlQuery>
queries) {
+        CacheQueryPartitionInfo[] result = null;
+
+        for (GridCacheSqlQuery qry : queries) {
+            CacheQueryPartitionInfo[] partInfo = (CacheQueryPartitionInfo[])qry.derivedPartitions();
+
+            if (partInfo == null) {
+                result = null;
+
+                break;
+            }
+
+            if (result == null)
+                result = partInfo;
+            else
+                result = mergePartitionInfo(result, partInfo);
+        }
+
+        return result;
+    }
+
     /**
      * Simplified tree-like model for a query.
      * - SELECT : All the children are list of joined query models in the FROM clause.

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java
new file mode 100644
index 0000000..fddd3f4
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlRoutingTest.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+
+public class IgniteSqlRoutingTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static String NODE_CLIENT = "client";
+
+    /** */
+    private static String CACHE_PERSON = "Person";
+
+    /** */
+    private static String CACHE_CALL = "Call";
+
+    /** */
+    private static int NODE_COUNT = 4;
+
+    /** broadcast query to ensure events came from all nodes */
+    private static String FINAL_QRY = "select count(1) from {0} where name=?";
+
+    /** Param to distinguish the final query event */
+    private static String FINAL_QRY_PARAM = "Abracadabra";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        c.setDiscoverySpi(disco);
+
+        c.setMarshaller(new BinaryMarshaller());
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        CacheConfiguration ccfg = buildCacheConfiguration(gridName);
+
+        if (ccfg != null)
+            ccfgs.add(ccfg);
+
+        ccfgs.add(buildCacheConfiguration(CACHE_PERSON));
+        ccfgs.add(buildCacheConfiguration(CACHE_CALL));
+
+        c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        if (gridName.equals(NODE_CLIENT))
+            c.setClientMode(true);
+
+        c.setCacheKeyConfiguration(new CacheKeyConfiguration(CallKey.class));
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(NODE_COUNT);
+
+        startGrid(NODE_CLIENT);
+
+        fillCaches();
+    }
+
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    private CacheConfiguration buildCacheConfiguration(String name) {
+        if (name.equals(CACHE_PERSON)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity();
+
+            entity.setKeyType(Integer.class.getName());
+
+            entity.setValueType(Person.class.getName());
+
+            LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+            fields.put("name", String.class.getName());
+            fields.put("age", Integer.class.getName());
+
+            entity.setFields(fields);
+
+            ccfg.setQueryEntities(Arrays.asList(entity));
+
+            return ccfg;
+        }
+
+        if (name.equals(CACHE_CALL)) {
+            CacheConfiguration ccfg = new CacheConfiguration(CACHE_CALL);
+
+            ccfg.setCacheMode(CacheMode.PARTITIONED);
+
+            QueryEntity entity = new QueryEntity(CallKey.class.getName(), Call.class.getName());
+
+            Set<String> keyFields = new HashSet<>();
+
+            keyFields.add("personId");
+            keyFields.add("id");
+
+            entity.setKeyFields(keyFields);
+
+            LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+            fields.put("personId", Integer.class.getName());
+            fields.put("id", Integer.class.getName());
+            fields.put("name", String.class.getName());
+            fields.put("duration", Integer.class.getName());
+
+            entity.setFields(fields);
+
+            ccfg.setQueryEntities(Arrays.asList(entity));
+
+            return ccfg;
+        }
+        return null;
+    }
+
+    /** */
+    public void testUnicastQuerySelectAffinityKeyEqualsConstant() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        List<List<?>> result = runQueryEnsureUnicast(cache,
+                new SqlFieldsQuery("select id, name, duration from Call where personId=100
order by id"), 1);
+
+        assertEquals(2, result.size());
+
+        checkResultsRow(result, 0, 1, "caller1", 100);
+        checkResultsRow(result, 1, 2, "caller2", 200);
+    }
+
+    /** */
+    public void testUnicastQuerySelectAffinityKeyEqualsParameter() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        List<List<?>> result = runQueryEnsureUnicast(cache,
+                new SqlFieldsQuery("select id, name, duration from Call where personId=?
order by id").setArgs(100), 1);
+
+        assertEquals(2, result.size());
+
+        checkResultsRow(result, 0, 1, "caller1", 100);
+        checkResultsRow(result, 1, 2, "caller2", 200);
+    }
+
+    /** */
+    public void testUnicastQuerySelectKeyEqualsParameterReused() throws Exception {
+        IgniteCache<Integer, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+        for (int key : new int[] {0, 250, 500, 750, 1000} ) {
+            List<List<?>> result = runQueryEnsureUnicast(cache,
+                    new SqlFieldsQuery("select name, age from Person where _key=?").setArgs(key),
1);
+
+            assertEquals(1, result.size());
+
+            Person person = cache.get(key);
+
+            checkResultsRow(result, 0, person.name, person.age);
+        }
+    }
+
+    /** */
+    public void testUnicastQuerySelectKeyEqualsParameter() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        CallKey callKey = new CallKey(5, 1);
+
+        List<List<?>> result = runQueryEnsureUnicast(cache,
+                new SqlFieldsQuery("select name, duration from Call where _key=?")
+                .setArgs(callKey), 1);
+
+        assertEquals(1, result.size());
+
+        Call call = cache.get(callKey);
+
+        checkResultsRow(result, 0, call.name, call.duration);
+    }
+
+    /** Check group, having, ordering allowed to be unicast requests */
+    public void testUnicastQueryGroups() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        String qry = "select name, count(1) " +
+                "from Call " +
+                "where personId = ? " +
+                "group by name " +
+                "having count(1) = 1 " +
+                "order by name";
+
+        final int personId = 10;
+
+        List<List<?>> result = runQueryEnsureUnicast(cache, new SqlFieldsQuery(qry).setArgs(personId),
1);
+
+        assertEquals(2, result.size());
+
+        checkResultsRow(result, 0, "caller1", 1L);
+        checkResultsRow(result, 1, "caller2", 1L);
+    }
+
+    /** */
+    public void testUnicastQuerySelectKeyEqualAndFieldParameter() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        CallKey callKey = new CallKey(5, 1);
+
+        List<List<?>> result = runQueryEnsureUnicast(cache,
+                new SqlFieldsQuery("select name, duration from Call where _key=? and duration=?")
+                        .setArgs(callKey, 100), 1);
+
+        assertEquals(1, result.size());
+
+        Call call = cache.get(callKey);
+
+        checkResultsRow(result, 0, call.name, call.duration);
+    }
+
+    /** */
+    public void testUnicastQuerySelect2KeyEqualsAndFieldParameter() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        CallKey callKey1 = new CallKey(5, 1);
+        CallKey callKey2 = new CallKey(1000, 1);
+
+        List<List<?>> result = runQueryEnsureUnicast(cache,
+                new SqlFieldsQuery("select name, duration from Call where (_key=? and duration=?)
or (_key=?)")
+                        .setArgs(callKey1, 100, callKey2), 2);
+
+        assertEquals(2, result.size());
+
+        Call call = cache.get(callKey1);
+
+        checkResultsRow(result, 0, call.name, call.duration);
+
+        call = cache.get(callKey2);
+
+        checkResultsRow(result, 1, call.name, call.duration);
+    }
+
+    /** */
+    public void testBroadcastQuerySelectKeyEqualsOrFieldParameter() throws Exception {
+        IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
+
+        CallKey callKey = new CallKey(5, 1);
+
+        List<List<?>> result = runQueryEnsureBroadcast(cache,
+                new SqlFieldsQuery("select name, duration from Call where _key=? or duration=?")
+                        .setArgs(callKey, 100));
+
+        assertEquals(cache.size() / 2, result.size());
+    }
+
+    /** */
+    private void fillCaches() {
+        IgniteCache<CallKey, Call> callCache = grid(NODE_CLIENT).cache(CACHE_CALL);
+        IgniteCache<Integer, Person> personCache = grid(NODE_CLIENT).cache(CACHE_PERSON);
+
+        int count = affinity(personCache).partitions();
+
+        String[] names = {"John", "Bob", "James", "David", "Chuck"};
+
+        for (int i = 0; i < count; i++) {
+            Person person = new Person(names[i % names.length], 20 + (i % names.length));
+
+            personCache.put(i, person);
+
+            // each person gets 2 calls
+            callCache.put(new CallKey(i, 1), new Call("caller1", 100));
+            callCache.put(new CallKey(i, 2), new Call("caller2", 200));
+        }
+    }
+
+    /** */
+    private void checkResultsRow(List<List<?>> results, int rowId, Object ...
expected) throws Exception {
+        assertTrue(rowId < results.size());
+
+        List<?> row = results.get(rowId);
+
+        assertEquals(expected.length, row.size());
+
+        for(int col = 0; col < expected.length; ++col)
+            assertEquals(expected[col], row.get(col));
+    }
+
+    /** Run query and check that only one node did generate 'query executed' event for it
*/
+    private List<List<?>> runQueryEnsureUnicast(IgniteCache<?,?> cache,
SqlFieldsQuery qry, int nodeCnt) throws Exception {
+        try (EventCounter evtCounter = new EventCounter(nodeCnt)) {
+            List<List<?>> result = cache.query(qry).getAll();
+
+            // do broadcast 'marker' query to ensure that we received all events from previous
qry
+            cache.query(new SqlFieldsQuery(
+                    MessageFormat.format(FINAL_QRY, cache.getName()))
+                    .setArgs(FINAL_QRY_PARAM)).getAll();
+
+            // wait for all events from 'marker' query
+            evtCounter.await();
+
+            // return result set of first query
+            return result;
+        }
+    }
+
+    private List<List<?>> runQueryEnsureBroadcast(IgniteCache<?, ?> cache,
SqlFieldsQuery qry) throws Exception {
+        final CountDownLatch execLatch = new CountDownLatch(NODE_COUNT);
+
+        final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt instanceof CacheQueryExecutedEvent;
+
+                CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+                assertNotNull(qe.clause());
+
+                execLatch.countDown();
+
+                return true;
+            }
+        };
+
+        for (int i = 0; i < NODE_COUNT; i++)
+            grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
+
+        List<List<?>> result = cache.query(qry).getAll();
+
+        assertTrue(execLatch.await(5000, MILLISECONDS));
+
+        for (int i = 0; i < NODE_COUNT; i++)
+            grid(i).events().stopLocalListen(pred);
+
+        return result;
+    }
+
+    /** */
+    private class EventCounter implements AutoCloseable {
+        /** */
+        final AtomicInteger cnt;
+
+        /** */
+        final CountDownLatch execLatch;
+
+        /** */
+        final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt instanceof CacheQueryExecutedEvent;
+
+                CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+                String cacheName = qe.cacheName();
+
+                assert cacheName != null;
+
+                if (!cacheName.equals(CACHE_PERSON) &&
+                    !cacheName.equals(CACHE_CALL))
+                    return true;
+
+                assertNotNull(qe.clause());
+
+                Object[] args = qe.arguments();
+
+                if ((args != null) && (args.length > 0) && (args[0] instanceof
String)) {
+                    String strParam = (String)args[0];
+
+                    if (FINAL_QRY_PARAM.equals(strParam)) {
+                        execLatch.countDown();
+
+                        return true;
+                    }
+                }
+                cnt.decrementAndGet();
+
+                return true;
+            }
+        };
+
+        /** */
+        private EventCounter(int cnt) {
+            this.cnt = new AtomicInteger(cnt);
+
+            this.execLatch = new CountDownLatch(NODE_COUNT);
+
+            for (int i = 0; i < NODE_COUNT; i++)
+                grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
+        }
+
+        /** */
+        public void await() throws Exception {
+            assertTrue(execLatch.await(5000, MILLISECONDS));
+
+            assertEquals(0, cnt.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            for (int i = 0; i < NODE_COUNT; i++)
+                grid(i).events().stopLocalListen(pred);
+        }
+    }
+
+    /** */
+    private static class Person {
+        /** */
+        private String name;
+
+        /** */
+        private int age;
+
+        /** */
+        public Person(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        /** */
+        @Override public int hashCode() {
+            return name.hashCode() ^ age;
+        }
+
+        /** */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof Person))
+                return false;
+
+            Person other = (Person)o;
+
+            return name.equals(other.name) && age == other.age;
+        }
+    }
+
+    /** */
+    private static class CallKey {
+        /** */
+        @AffinityKeyMapped
+        private int personId;
+
+        /** */
+        private int id;
+
+        /** */
+        public CallKey(int personId, int id) {
+            this.personId = personId;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return personId ^ id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof CallKey))
+                return false;
+
+            CallKey other = (CallKey)o;
+
+            return this.personId == other.personId && this.id == other.id;
+        }
+    }
+
+    /** */
+    private static class Call {
+        /** */
+        private String name;
+
+        /** */
+        private int duration;
+
+        /** */
+        public Call(String name, int duration) {
+            this.name = name;
+
+            this.duration = duration;
+        }
+
+        /** */
+        @Override public int hashCode() {
+            return name.hashCode() ^ duration;
+        }
+
+        /** */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (!(o instanceof Call))
+                return false;
+
+            Call other = (Call)o;
+
+            return name.equals(other.name) && duration == other.duration;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fbf0e353/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 14fb6ce..13dfef7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -110,6 +110,7 @@ import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfT
 import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlRoutingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteSqlSegmentedIndexSelfTest;
@@ -287,6 +288,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheDistributedPartitionQueryNodeRestartsSelfTest.class);
         suite.addTestSuite(IgniteCacheDistributedPartitionQueryConfigurationSelfTest.class);
         suite.addTestSuite(IgniteSqlKeyValueFieldsTest.class);
+        suite.addTestSuite(IgniteSqlRoutingTest.class);
 
         return suite;
     }


Mime
View raw message