ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-1232 Added example, new tests, benchmarks, performance optimizations.
Date Fri, 22 Jul 2016 08:44:16 GMT
ignite-1232 Added example, new tests, benchmarks, performance optimizations.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/129086a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/129086a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/129086a2

Branch: refs/heads/ignite-1232
Commit: 129086a2c5a34b9bb83b0b7f542c8d4aed6b9cd3
Parents: 028858f
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 22 11:43:51 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 22 11:43:51 2016 +0300

----------------------------------------------------------------------
 .../examples/datagrid/CacheQueryExample.java    |  78 +++++--
 .../managers/communication/GridIoManager.java   |  13 ++
 .../cache/GridCacheAffinityManager.java         |   2 +-
 .../cache/query/GridCacheTwoStepQuery.java      |  36 +++
 .../processors/query/h2/IgniteH2Indexing.java   |  52 ++++-
 .../query/h2/opt/GridH2IndexBase.java           |  24 +-
 .../query/h2/opt/GridH2QueryContext.java        |  40 ++--
 .../processors/query/h2/opt/GridH2Table.java    |  13 +-
 .../query/h2/opt/GridH2TreeIndex.java           |   2 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 222 +++++++++----------
 .../h2/twostep/GridReduceQueryExecutor.java     | 134 +++++------
 .../h2/twostep/msg/GridH2QueryRequest.java      |  12 +-
 .../IgniteCacheCrossCacheJoinRandomTest.java    |   2 +-
 ...IgniteCacheJoinQueryWithAffinityKeyTest.java | 115 ++++++++--
 ...QueryNodeRestartDistributedJoinSelfTest.java | 118 ++++++++--
 .../yardstick/IgniteBenchmarkArguments.java     |  11 -
 .../IgniteSqlQueryDistributedJoinBenchmark.java |  45 +++-
 ...lQueryDistributedJoinBroadcastBenchmark.java |  28 +++
 18 files changed, 619 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
index 9200489..85d74e0 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
@@ -46,8 +46,11 @@ import org.apache.ignite.lang.IgniteBiPredicate;
  * limitations (not applied if data is queried from one node only):
  * <ul>
  *     <li>
- *         Joins will work correctly only if joined objects are stored in
+ *         Non-distributed joins will work correctly only if joined objects are stored in
  *         collocated mode. Refer to {@link AffinityKey} javadoc for more details.
+ *         <p>
+ *         To use distributed joins it is necessary to set query 'distributedJoin' flag using
+ *         {@link SqlFieldsQuery#setDistributedJoins(boolean)} or {@link SqlQuery#setDistributedJoins(boolean)}.
  *     </li>
  *     <li>
  *         Note that if you created query on to replicated cache, all data will
@@ -65,6 +68,9 @@ public class CacheQueryExample {
     /** Organizations cache name. */
     private static final String ORG_CACHE = CacheQueryExample.class.getSimpleName() + "Organizations";
 
+    /** Persons collocated with Organizations cache name. */
+    private static final String COLLOCATED_PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "CollocatedPersons";
+
     /** Persons cache name. */
     private static final String PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "Persons";
 
@@ -84,14 +90,20 @@ public class CacheQueryExample {
             orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
             orgCacheCfg.setIndexedTypes(Long.class, Organization.class);
 
+            CacheConfiguration<AffinityKey<Long>, Person> colPersonCacheCfg = new CacheConfiguration<>(COLLOCATED_PERSON_CACHE);
+
+            colPersonCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
+            colPersonCacheCfg.setIndexedTypes(AffinityKey.class, Person.class);
+
             CacheConfiguration<AffinityKey<Long>, Person> personCacheCfg = new CacheConfiguration<>(PERSON_CACHE);
 
             personCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
-            personCacheCfg.setIndexedTypes(AffinityKey.class, Person.class);
+            personCacheCfg.setIndexedTypes(Long.class, Person.class);
 
             // Auto-close cache at the end of the example.
             try (
                 IgniteCache<Long, Organization> orgCache = ignite.getOrCreateCache(orgCacheCfg);
+                IgniteCache<AffinityKey<Long>, Person> colPersonCache = ignite.getOrCreateCache(colPersonCacheCfg);
                 IgniteCache<AffinityKey<Long>, Person> personCache = ignite.getOrCreateCache(personCacheCfg)
             ) {
                 // Populate cache.
@@ -103,9 +115,12 @@ public class CacheQueryExample {
                 // Example for SQL-based querying employees based on salary ranges.
                 sqlQuery();
 
-                // Example for SQL-based querying employees for a given organization (includes SQL join).
+                // Example for SQL-based querying employees for a given organization (includes SQL join for collocated objects).
                 sqlQueryWithJoin();
 
+                // Example for SQL-based querying employees for a given organization (includes distributed SQL join).
+                sqlQueryWithDistributedJoin();
+
                 // Example for TEXT-based querying for a given string in peoples resumes.
                 textQuery();
 
@@ -121,6 +136,7 @@ public class CacheQueryExample {
             }
             finally {
                 // Distributed cache could be removed from cluster only by #destroyCache() call.
+                ignite.destroyCache(COLLOCATED_PERSON_CACHE);
                 ignite.destroyCache(PERSON_CACHE);
                 ignite.destroyCache(ORG_CACHE);
             }
@@ -133,7 +149,7 @@ public class CacheQueryExample {
      * Example for scan query based on a predicate.
      */
     private static void scanQuery() {
-        IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(PERSON_CACHE).withKeepBinary();
+        IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE).withKeepBinary();
 
         ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>(
             new IgniteBiPredicate<BinaryObject, BinaryObject>() {
@@ -170,7 +186,7 @@ public class CacheQueryExample {
      * Example for SQL queries based on all employees working for a specific organization.
      */
     private static void sqlQueryWithJoin() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // SQL clause query which joins on 2 types to select people for a specific organization.
         String joinSql =
@@ -189,6 +205,32 @@ public class CacheQueryExample {
     }
 
     /**
+     * Example for SQL queries based on all employees working for a specific organization (query uses distributed join).
+     */
+    private static void sqlQueryWithDistributedJoin() {
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+
+        // SQL clause query which joins on 2 types to select people for a specific organization.
+        String joinSql =
+            "from Person, \"" + ORG_CACHE + "\".Organization as org " +
+            "where Person.orgId = org.id " +
+            "and lower(org.name) = lower(?)";
+
+        SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql).
+            setArgs("ApacheIgnite");
+
+        // Enable distributed joins for query.
+        qry.setDistributedJoins(true);
+
+        // Execute queries for find employees for different organizations.
+        print("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());
+
+        qry.setArgs("Other");
+
+        print("Following people are 'Other' employees (distributed join): ", cache.query(qry).getAll());
+    }
+
+    /**
      * Example for TEXT queries using LUCENE-based indexing of people's resumes.
      */
     private static void textQuery() {
@@ -210,7 +252,7 @@ public class CacheQueryExample {
      * Example for SQL queries to calculate average salary for a specific organization.
      */
     private static void sqlQueryWithAggregation() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // Calculate average of salary of all persons in ApacheIgnite.
         // Note that we also join on Organization cache as well.
@@ -249,7 +291,7 @@ public class CacheQueryExample {
      * fields instead of whole key-value pairs.
      */
     private static void sqlFieldsQueryWithJoin() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // Execute query to get names of all employees.
         String sql =
@@ -263,7 +305,7 @@ public class CacheQueryExample {
         List<List<?>> res = cursor.getAll();
 
         // Print persons' names and organizations' names.
-        print("Names of all employees and organizations they belong to:", res);
+        print("Names of all employees and organizations they belong to: ", res);
     }
 
     /**
@@ -282,9 +324,11 @@ public class CacheQueryExample {
         orgCache.put(org1.id(), org1);
         orgCache.put(org2.id(), org2);
 
-        IgniteCache<AffinityKey<Long>, Person> personCache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> colPersonCache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
+        IgniteCache<Long, Person> personCache = Ignition.ignite().cache(PERSON_CACHE);
 
-        // Clear cache before running the example.
+        // Clear caches before running the example.
+        colPersonCache.clear();
         personCache.clear();
 
         // People.
@@ -295,10 +339,16 @@ public class CacheQueryExample {
 
         // Note that in this example we use custom affinity key for Person objects
         // to ensure that all persons are collocated with their organizations.
-        personCache.put(p1.key(), p1);
-        personCache.put(p2.key(), p2);
-        personCache.put(p3.key(), p3);
-        personCache.put(p4.key(), p4);
+        colPersonCache.put(p1.key(), p1);
+        colPersonCache.put(p2.key(), p2);
+        colPersonCache.put(p3.key(), p3);
+        colPersonCache.put(p4.key(), p4);
+
+        // These Person objects are not collocated with their organizations.
+        personCache.put(p1.id, p1);
+        personCache.put(p2.id, p2);
+        personCache.put(p3.id, p3);
+        personCache.put(p4.id, p4);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8b0465f..99cb7f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1374,6 +1374,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
+     * @param topicOrd GridTopic enumeration ordinal.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+        throws IgniteCheckedException {
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 5e843dc..5e32957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -218,7 +218,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      * @param key Key.
      * @return Affinity key.
      */
-    private Object affinityKey(Object key) {
+    public Object affinityKey(Object key) {
         if (key instanceof CacheObject && !(key instanceof BinaryObject))
             key = ((CacheObject)key).value(cctx.cacheObjectContext(), false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index eacf59c..8dcba2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -60,6 +60,12 @@ public class GridCacheTwoStepQuery {
     /** */
     private boolean skipMergeTbl;
 
+    /** */
+    private List<Integer> caches;
+
+    /** */
+    private List<Integer> extraCaches;
+
     /**
      * @param schemas Schema names in query.
      * @param tbls Tables in query.
@@ -162,6 +168,34 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Caches.
+     */
+    public List<Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @param caches Caches.
+     */
+    public void caches(List<Integer> caches) {
+        this.caches = caches;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> extraCaches() {
+        return extraCaches;
+    }
+
+    /**
+     * @param extraCaches Caches.
+     */
+    public void extraCaches(List<Integer> extraCaches) {
+        this.extraCaches = extraCaches;
+    }
+
+    /**
      * @return Spaces.
      */
     public Collection<String> spaces() {
@@ -191,6 +225,8 @@ public class GridCacheTwoStepQuery {
 
         GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
 
+        cp.caches = caches;
+        cp.extraCaches = extraCaches;
         cp.spaces = spaces;
         cp.rdc = rdc.copy(args);
         cp.skipMergeTbl = skipMergeTbl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2153c18..535881e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -64,6 +64,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -107,6 +108,7 @@ import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -787,7 +789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return new GridQueryFieldsResultAdapter(meta, new FieldsIterator(rs));
         }
         finally {
-            GridH2QueryContext.clear(false);
+            GridH2QueryContext.clearThreadLocal();
         }
     }
 
@@ -993,7 +995,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             return new KeyValIterator(rs);
         }
         finally {
-            GridH2QueryContext.clear(false);
+            GridH2QueryContext.clearThreadLocal();
         }
     }
 
@@ -1138,7 +1140,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
             }
             finally {
-                GridH2QueryContext.clear(false);
+                GridH2QueryContext.clearThreadLocal();
             }
 
             try {
@@ -1147,15 +1149,41 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), grpByCollocated,
                     distributedJoins);
 
+                List<Integer> caches;
+                List<Integer> extraCaches = null;
+
                 // Setup spaces from schemas.
                 if (!twoStepQry.schemas().isEmpty()) {
                     Collection<String> spaces = new ArrayList<>(twoStepQry.schemas().size());
+                    caches = new ArrayList<>(twoStepQry.schemas().size() + 1);
+                    caches.add(cctx.cacheId());
+
+                    for (String schema : twoStepQry.schemas()) {
+                        String space0 = space(schema);
+
+                        spaces.add(space0);
 
-                    for (String schema : twoStepQry.schemas())
-                        spaces.add(space(schema));
+                        if (!F.eq(space0, space)) {
+                            int cacheId = CU.cacheId(space0);
+
+                            caches.add(cacheId);
+
+                            if (extraCaches == null)
+                                extraCaches = new ArrayList<>();
+
+                            extraCaches.add(cacheId);
+                        }
+                    }
 
                     twoStepQry.spaces(spaces);
                 }
+                else {
+                    caches = Collections.singletonList(cctx.cacheId());
+                    extraCaches = null;
+                }
+
+                twoStepQry.caches(caches);
+                twoStepQry.extraCaches(extraCaches);
 
                 meta = meta(stmt.getMetaData());
             }
@@ -1649,20 +1677,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /**
      * @param topic Topic.
+     * @param topicOrd Topic ordinal for {@link GridTopic}.
      * @param nodes Nodes.
      * @param msg Message.
      * @param specialize Optional closure to specialize message for each node.
-     * @param locNodeHandler Handler for local node.
+     * @param locNodeHnd Handler for local node.
      * @param plc Policy identifying the executor service which will process message.
      * @param runLocParallel Run local handler in parallel thread.
      * @return {@code true} If all messages sent successfully.
      */
     public boolean send(
         Object topic,
+        int topicOrd,
         Collection<ClusterNode> nodes,
         Message msg,
         @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
-        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHandler,
+        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHnd,
         byte plc,
         boolean runLocParallel
     ) {
@@ -1688,7 +1718,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         ((GridCacheQueryMarshallable)msg).marshall(marshaller);
                 }
 
-                ctx.io().send(node, topic, msg, plc);
+                ctx.io().send(node, topic, topicOrd, msg, plc);
             }
             catch (IgniteCheckedException e) {
                 ok = false;
@@ -1711,7 +1741,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here.
                     ctx.closure().runLocal(new GridPlainRunnable() {
                         @Override public void run() {
-                            locNodeHandler.apply(finalLocNode, finalMsg);
+                            locNodeHnd.apply(finalLocNode, finalMsg);
                         }
                     }, plc).listen(logger);
                 }
@@ -1722,7 +1752,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 }
             }
             else
-                locNodeHandler.apply(locNode, msg);
+                locNodeHnd.apply(locNode, msg);
         }
 
         return ok;
@@ -1731,7 +1761,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * @return Serializer.
      */
-    protected JavaObjectSerializer h2Serializer() {
+    private JavaObjectSerializer h2Serializer() {
         return new JavaObjectSerializer() {
                 @Override public byte[] serialize(Object obj) throws Exception {
                     return marshaller.marshal(obj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 01065c3..c29239f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -391,8 +391,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param msg Message.
      */
     private void send(Collection<ClusterNode> nodes, Message msg) {
-        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHnd,
-            GridIoPolicy.IDX_POOL, false))
+        if (!getTable().rowDescriptor().indexing().send(msgTopic,
+            -1,
+            nodes,
+            msg,
+            null,
+            locNodeHnd,
+            GridIoPolicy.IDX_POOL,
+            false))
             throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
     }
 
@@ -981,18 +987,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
             if (pkFirst == null || pkLast == null || !equal(pkFirst, pkLast))
                 return null;
 
-            Object pkAffKeyFirst;
-            Object pkAffKeyLast;
-
-            GridKernalContext ctx = kernalContext();
-
-            try {
-                pkAffKeyFirst = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject());
-                pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkLast.getObject());
-            }
-            catch (IgniteCheckedException e) {
-                throw new CacheException(e);
-            }
+            Object pkAffKeyFirst = cctx.affinity().affinityKey(pkFirst.getObject());
+            Object pkAffKeyLast = cctx.affinity().affinityKey(pkLast.getObject());
 
             if (pkAffKeyFirst == null || pkAffKeyLast == null)
                 throw new CacheException("Cache key without affinity key.");

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 14e34d9..19ea2b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -198,7 +198,7 @@ public class GridH2QueryContext {
      * @param cctx Cache context.
      * @return Owning node ID.
      */
-    public UUID nodeForPartition(int p, GridCacheContext<?,?> cctx) {
+    public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
         UUID[] nodeIds = partsNodes;
 
         if (nodeIds == null) {
@@ -206,7 +206,7 @@ public class GridH2QueryContext {
 
             nodeIds = new UUID[cctx.affinity().partitions()];
 
-            for (Map.Entry<UUID,int[]> e : partsMap.entrySet()) {
+            for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
                 UUID nodeId = e.getKey();
                 int[] nodeParts = e.getValue();
 
@@ -356,7 +356,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run distributed joins.
-         if (x.key.type == MAP && qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -364,18 +364,13 @@ public class GridH2QueryContext {
 
     /**
      * Drops current thread local context.
-     *
-     * @param onlyThreadLoc Drop only thread local context but keep global.
      */
-    public static void clear(boolean onlyThreadLoc) {
+    public static void clearThreadLocal() {
         GridH2QueryContext x = qctx.get();
 
         assert x != null;
 
         qctx.remove();
-
-        if (!onlyThreadLoc && x.key.type == MAP)
-            doClear(x.key, false);
     }
 
     /**
@@ -383,30 +378,41 @@ public class GridH2QueryContext {
      * @param nodeId The node who initiated the query.
      * @param qryId The query ID.
      * @param type Query type.
+     * @return {@code True} if context was found.
      */
-    public static void clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        doClear(new Key(locNodeId, nodeId, qryId, type), false);
+    public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+        return doClear(new Key(locNodeId, nodeId, qryId, type), false);
     }
 
     /**
      * @param key Context key.
      * @param nodeStop Node is stopping.
+     * @return {@code True} if context was found.
      */
-    private static void doClear(Key key, boolean nodeStop) {
+    private static boolean doClear(Key key, boolean nodeStop) {
         assert key.type == MAP : key.type;
 
         GridH2QueryContext x = qctxs.remove(key);
 
         if (x == null)
-            return;
-
-        x.cleared = true;
+            return false;
 
         assert x.key.equals(key);
 
-        x.clearSnapshots();
+        x.clearContext(nodeStop);
+
+        return true;
+    }
+
+    /**
+     * @param nodeStop Node is stopping.
+     */
+    public void clearContext(boolean nodeStop) {
+        cleared = true;
+
+        clearSnapshots();
 
-        List<GridReservable> r = x.reservations;
+        List<GridReservable> r = reservations;
 
         if (!nodeStop && !F.isEmpty(r)) {
             for (int i = 0; i < r.size(); i++)

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index e2356f1..8d080ae 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -459,9 +459,16 @@ public class GridH2Table extends TableBase {
         if (!snapshotEnabled)
             return;
 
+        releaseSnapshots0(idxs);
+    }
+
+    /**
+     * @param idxs Indexes.
+     */
+    private void releaseSnapshots0(ArrayList<Index> idxs) {
         // Release snapshots on all except first which is scan.
         for (int i = 1, len = idxs.size(); i < len; i++)
-            index(i).releaseSnapshot();
+            ((GridH2IndexBase)idxs.get(i)).releaseSnapshot();
     }
 
     /**
@@ -641,6 +648,8 @@ public class GridH2Table extends TableBase {
 
         Lock l = lock(true, Long.MAX_VALUE);
 
+        ArrayList<Index> idxs0 = new ArrayList<>(idxs);
+
         try {
             snapshotIndexes(null); // Allow read access while we are rebuilding indexes.
 
@@ -657,7 +666,7 @@ public class GridH2Table extends TableBase {
             throw new IgniteInterruptedException(e);
         }
         finally {
-            releaseSnapshots();
+            releaseSnapshots0(idxs0);
 
             unlock(l);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index c8c0446..33aaf7b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -48,7 +48,7 @@ import org.jetbrains.annotations.Nullable;
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
     /** */
-    protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
+    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
 
     /** */
     private final boolean snapshotEnabled;

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 3238134..bb5e419 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
@@ -70,13 +69,12 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.result.ResultInterface;
 import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -120,7 +118,7 @@ public class GridMapQueryExecutor {
     private IgniteH2Indexing h2;
 
     /** */
-    private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess = new ConcurrentHashMap8<>();
+    private ConcurrentMap<UUID, NodeResults> qryRess = new ConcurrentHashMap8<>();
 
     /** */
     private final GridSpinBusyLock busyLock;
@@ -129,10 +127,6 @@ public class GridMapQueryExecutor {
     private final ConcurrentMap<T2<String, AffinityTopologyVersion>, GridReservable> reservations =
         new ConcurrentHashMap8<>();
 
-    /** */
-    private final GridBoundedConcurrentLinkedHashMap<QueryKey, Boolean> qryHist =
-        new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
-
     /**
      * @param busyLock Busy lock.
      */
@@ -159,12 +153,12 @@ public class GridMapQueryExecutor {
 
                 GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
 
-                ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId);
+                NodeResults nodeRess = qryRess.remove(nodeId);
 
                 if (nodeRess == null)
                     return;
 
-                for (QueryResults ress : nodeRess.values())
+                for (QueryResults ress : nodeRess.results().values())
                     ress.cancel();
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
@@ -228,16 +222,17 @@ public class GridMapQueryExecutor {
     private void onCancel(ClusterNode node, GridQueryCancelRequest msg) {
         long qryReqId = msg.queryRequestId();
 
-        Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), qryReqId), Boolean.FALSE);
+        NodeResults nodeRess = resultsForNode(node.id());
 
-        if (old == null || !old)
-            return;
+        boolean clear = GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
 
-        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
+        if (!clear) {
+            nodeRess.onCancel(qryReqId);
 
-        GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
+            GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
+        }
 
-        QueryResults results = nodeRess.remove(qryReqId);
+        QueryResults results = nodeRess.results().remove(qryReqId);
 
         if (results == null)
             return;
@@ -249,13 +244,13 @@ public class GridMapQueryExecutor {
      * @param nodeId Node ID.
      * @return Results for node.
      */
-    private ConcurrentMap<Long, QueryResults> resultsForNode(UUID nodeId) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(nodeId);
+    private NodeResults resultsForNode(UUID nodeId) {
+        NodeResults nodeRess = qryRess.get(nodeId);
 
         if (nodeRess == null) {
-            nodeRess = new ConcurrentHashMap8<>();
+            nodeRess = new NodeResults();
 
-            ConcurrentMap<Long, QueryResults> old = qryRess.putIfAbsent(nodeId, nodeRess);
+            NodeResults old = qryRess.putIfAbsent(nodeId, nodeRess);
 
             if (old != null)
                 nodeRess = old;
@@ -265,19 +260,6 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param cacheName Cache name.
-     * @return Cache context or {@code null} if none.
-     */
-    @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) {
-        GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName);
-
-        if (cache == null)
-            return null;
-
-        return cache.context();
-    }
-
-    /**
      * @param cctx Cache context.
      * @param p Partition ID.
      * @return Partition.
@@ -287,7 +269,7 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param cacheNames Cache names.
+     * @param cacheIds Cache IDs.
      * @param topVer Topology version.
      * @param explicitParts Explicit partitions list.
      * @param reserved Reserved list.
@@ -295,7 +277,7 @@ public class GridMapQueryExecutor {
      * @throws IgniteCheckedException If failed.
      */
     private boolean reservePartitions(
-        Collection<String> cacheNames,
+        List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
         final int[] explicitParts,
         List<GridReservable> reserved
@@ -304,8 +286,8 @@ public class GridMapQueryExecutor {
 
         Collection<Integer> partIds = wrap(explicitParts);
 
-        for (String cacheName : cacheNames) {
-            GridCacheContext<?, ?> cctx = cacheContext(cacheName);
+        for (int i = 0; i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
 
             if (cctx == null) // Cache was not found, probably was not deployed yet.
                 return false;
@@ -427,12 +409,23 @@ public class GridMapQueryExecutor {
      * @param req Query request.
      */
     private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
-        List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+        List<Integer> cacheIds;
+
+        if (req.extraSpaces() != null) {
+            cacheIds = new ArrayList<>(req.extraSpaces().size() + 1);
+
+            cacheIds.add(CU.cacheId(req.space()));
+
+            for (String extraSpace : req.extraSpaces())
+                cacheIds.add(CU.cacheId(extraSpace));
+        }
+        else
+            cacheIds = Collections.singletonList(CU.cacheId(req.space()));
 
         onQueryRequest0(node,
             req.requestId(),
             req.queries(),
-            caches,
+            cacheIds,
             req.topologyVersion(),
             null,
             req.partitions(),
@@ -465,7 +458,7 @@ public class GridMapQueryExecutor {
      * @param node Node authored request.
      * @param reqId Request ID.
      * @param qrys Queries to execute.
-     * @param caches Caches which will be affected by these queries.
+     * @param cacheIds Caches which will be affected by these queries.
      * @param topVer Topology version.
      * @param partsMap Partitions map for unstable topology.
      * @param parts Explicit partitions for current node.
@@ -477,7 +470,7 @@ public class GridMapQueryExecutor {
         ClusterNode node,
         long reqId,
         Collection<GridCacheSqlQuery> qrys,
-        List<String> caches,
+        List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
         Map<UUID, int[]> partsMap,
         int[] parts,
@@ -485,7 +478,13 @@ public class GridMapQueryExecutor {
         int pageSize,
         boolean distributedJoins
     ) {
-        ConcurrentMap<Long, QueryResults> nodeRess = resultsForNode(node.id());
+        // Prepare to run queries.
+        GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
+
+        if (mainCctx == null)
+            throw new CacheException("Failed to find cache.");
+
+        NodeResults nodeRess = resultsForNode(node.id());
 
         QueryResults qr = null;
 
@@ -494,24 +493,16 @@ public class GridMapQueryExecutor {
         try {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
-                if (!reservePartitions(caches, topVer, parts, reserved)) {
+                if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
                     sendRetry(node, reqId);
 
                     return;
                 }
             }
 
-            String mainCache = caches.get(0);
-
-            // Prepare to run queries.
-            GridCacheContext<?,?> mainCctx = cacheContext(mainCache);
-
-            if (mainCctx == null)
-                throw new CacheException("Failed to find cache: " + mainCache);
-
             qr = new QueryResults(reqId, qrys.size(), mainCctx);
 
-            if (nodeRess.put(reqId, qr) != null)
+            if (nodeRess.results().put(reqId, qr) != null)
                 throw new IllegalStateException();
 
             // Prepare query context.
@@ -542,7 +533,7 @@ public class GridMapQueryExecutor {
                 }
             }
 
-            Connection conn = h2.connectionForSpace(mainCache);
+            Connection conn = h2.connectionForSpace(mainCctx.name());
 
             // Here we enforce join order to have the same behavior on all the nodes.
             h2.setupConnection(conn, distributedJoins, true);
@@ -553,14 +544,10 @@ public class GridMapQueryExecutor {
             reserved = null;
 
             try {
-                Boolean old = qryHist.putIfAbsent(new QueryKey(node.id(), reqId), Boolean.TRUE);
-
-                if (old != null) {
-                    assert !old;
-
-                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, MAP);
+                if (nodeRess.cancelled(reqId)) {
+                    GridH2QueryContext.clear(ctx.localNodeId(), node.id(), reqId, qctx.type());
 
-                    nodeRess.remove(reqId);
+                    nodeRess.results().remove(reqId);
 
                     return;
                 }
@@ -568,11 +555,13 @@ public class GridMapQueryExecutor {
                 // Run queries.
                 int i = 0;
 
+                boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+
                 for (GridCacheSqlQuery qry : qrys) {
-                    ResultSet rs = h2.executeSqlQueryWithTimer(mainCache, conn, qry.query(),
+                    ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
                         F.asList(qry.parameters()), true);
 
-                    if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                    if (evt) {
                         ctx.event().record(new CacheQueryExecutedEvent<>(
                             node,
                             "SQL query executed.",
@@ -605,7 +594,10 @@ public class GridMapQueryExecutor {
                 }
             }
             finally {
-                GridH2QueryContext.clear(distributedJoins);
+                GridH2QueryContext.clearThreadLocal();
+
+                if (!distributedJoins)
+                    qctx.clearContext(false);
 
                 if (!F.isEmpty(snapshotedTbls)) {
                     for (GridH2Table dataTbl : snapshotedTbls)
@@ -615,7 +607,7 @@ public class GridMapQueryExecutor {
         }
         catch (Throwable e) {
             if (qr != null) {
-                nodeRess.remove(reqId, qr);
+                nodeRess.results().remove(reqId, qr);
 
                 qr.cancel();
             }
@@ -669,9 +661,9 @@ public class GridMapQueryExecutor {
      * @param req Request.
      */
     private void onNextPageRequest(ClusterNode node, GridQueryNextPageRequest req) {
-        ConcurrentMap<Long, QueryResults> nodeRess = qryRess.get(node.id());
+        NodeResults nodeRess = qryRess.get(node.id());
 
-        QueryResults qr = nodeRess == null ? null : nodeRess.get(req.queryRequestId());
+        QueryResults qr = nodeRess == null ? null : nodeRess.results().get(req.queryRequestId());
 
         if (qr == null || qr.canceled)
             sendError(node, req.queryRequestId(), new CacheException("No query result found for request: " + req));
@@ -680,12 +672,13 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param nodeRess Results.
      * @param node Node.
      * @param qr Query results.
      * @param qry Query.
      * @param pageSize Page size.
      */
-    private void sendNextPage(ConcurrentMap<Long, QueryResults> nodeRess, ClusterNode node, QueryResults qr, int qry,
+    private void sendNextPage(NodeResults nodeRess, ClusterNode node, QueryResults qr, int qry,
         int pageSize) {
         QueryResult res = qr.result(qry);
 
@@ -701,14 +694,14 @@ public class GridMapQueryExecutor {
             res.close();
 
             if (qr.isAllClosed())
-                nodeRess.remove(qr.qryReqId, qr);
+                nodeRess.results().remove(qr.qryReqId, qr);
         }
 
         try {
             boolean loc = node.isLocal();
 
             GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page,
-                page == 0 ? res.rowCount : -1 ,
+                page == 0 ? res.rowCnt : -1 ,
                 res.cols,
                 loc ? null : toMessages(rows, new ArrayList<Message>(res.cols)),
                 loc ? rows : null);
@@ -761,6 +754,44 @@ public class GridMapQueryExecutor {
         }
     }
 
+
+    /**
+     *
+     */
+    private static class NodeResults {
+        /** */
+        private final ConcurrentMap<Long, QueryResults> res = new ConcurrentHashMap8<>();
+
+        /** */
+        private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
+            new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
+
+        /**
+         * @return All results.
+         */
+        ConcurrentMap<Long, QueryResults> results() {
+            return res;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code False} if query was already cancelled.
+         */
+        boolean cancelled(long qryId) {
+            return qryHist.get(qryId) != null;
+        }
+
+        /**
+         * @param qryId Query ID.
+         * @return {@code True} if cancelled.
+         */
+        boolean onCancel(long qryId) {
+            Boolean old = qryHist.putIfAbsent(qryId, Boolean.FALSE);
+
+            return old == null;
+        }
+    }
+
     /**
      *
      */
@@ -772,7 +803,7 @@ public class GridMapQueryExecutor {
         private final AtomicReferenceArray<QueryResult> results;
 
         /** */
-        private final GridCacheContext<?,?> cctx;
+        private final GridCacheContext<?, ?> cctx;
 
         /** */
         private volatile boolean canceled;
@@ -866,7 +897,7 @@ public class GridMapQueryExecutor {
         private int page;
 
         /** */
-        private final int rowCount;
+        private final int rowCnt;
 
         /** */
         private volatile boolean closed;
@@ -890,7 +921,7 @@ public class GridMapQueryExecutor {
                 throw new IllegalStateException(e); // Must not happen.
             }
 
-            rowCount = res.getRowCount();
+            rowCnt = res.getRowCount();
             cols = res.getVisibleColumnCount();
         }
 
@@ -982,51 +1013,4 @@ public class GridMapQueryExecutor {
             throw new IllegalStateException();
         }
     }
-
-    /**
-     *
-     */
-    private static class QueryKey {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final long qryId;
-
-        /**
-         * @param nodeId Node ID.
-         * @param qryId Query ID.
-         */
-        public QueryKey(UUID nodeId, long qryId) {
-            this.nodeId = nodeId;
-            this.qryId = qryId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            QueryKey key = (QueryKey)o;
-
-            return qryId == key.qryId && nodeId.equals(key.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int) (qryId ^ (qryId >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        public String toString() {
-            return S.toString(QueryKey.class, this);
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 5c2ff29..04449ac 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -111,9 +111,6 @@ public class GridReduceQueryExecutor {
     private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0");
 
     /** */
-    private boolean oldNodesInTop = true;
-
-    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -362,13 +359,13 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return {@code true} If preloading is active.
      */
-    private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+    private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) {
         if (hasMovingPartitions(cctx))
             return true;
 
         if (extraSpaces != null) {
-            for (String extraSpace : extraSpaces) {
-                if (hasMovingPartitions(cacheContext(extraSpace)))
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                if (hasMovingPartitions(cacheContext(extraSpaces.get(i))))
                     return true;
             }
         }
@@ -380,7 +377,7 @@ public class GridReduceQueryExecutor {
      * @param cctx Cache context.
      * @return {@code true} If cache context
      */
-    private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
+    private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
         GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
 
         for (GridDhtPartitionMap2 map : fullMap.values()) {
@@ -392,11 +389,11 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param name Cache name.
+     * @param cacheId Cache ID.
      * @return Cache context.
      */
-    private GridCacheContext<?,?> cacheContext(String name) {
-        return ctx.cache().internalCache(name).context();
+    private GridCacheContext<?,?> cacheContext(Integer cacheId) {
+        return ctx.cache().context().cacheContext(cacheId);
     }
 
     /**
@@ -407,19 +404,19 @@ public class GridReduceQueryExecutor {
      */
     private Collection<ClusterNode> stableDataNodes(
         AffinityTopologyVersion topVer,
-        final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces
+        final GridCacheContext<?, ?> cctx,
+        List<Integer> extraSpaces
     ) {
-        String space = cctx.name();
-
         Set<ClusterNode> nodes = new HashSet<>(cctx.affinity().assignment(topVer).primaryPartitionNodes());
 
         if (F.isEmpty(nodes))
-            throw new CacheException("Failed to find data nodes for cache: " + space);
+            throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
 
         if (!F.isEmpty(extraSpaces)) {
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+
+                String extraSpace = extraCctx.name();
 
                 if (extraCctx.isLocal())
                     continue; // No consistency guaranties for local caches.
@@ -469,33 +466,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @return {@code true} If there are old nodes in topology.
-     */
-    private boolean oldNodesInTopology() {
-        if (!oldNodesInTop)
-            return false;
-
-        NavigableMap<IgniteProductVersion, Collection<ClusterNode>> m = ctx.discovery().topologyVersionMap();
-
-        if (!F.isEmpty(m)) {
-            for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
-                if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
-                    break;
-
-                for (ClusterNode node : entry.getValue()) {
-                    if (!node.isClient() && !node.isDaemon())
-                        return true;
-                }
-            }
-        }
-
-        // If we did not find old nodes, we assume that old node will not join further.
-        oldNodesInTop = false;
-
-        return false;
-    }
-
-    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @param keepPortable Keep portable.
@@ -503,7 +473,7 @@ public class GridReduceQueryExecutor {
      * @return Cursor.
      */
     public Iterator<List<?>> query(
-        GridCacheContext<?,?> cctx,
+        GridCacheContext<?, ?> cctx,
         GridCacheTwoStepQuery qry,
         boolean keepPortable,
         boolean enforceJoinOrder
@@ -528,7 +498,7 @@ public class GridReduceQueryExecutor {
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-            List<String> extraSpaces = extraSpaces(space, qry.spaces());
+            List<Integer> extraSpaces = qry.extraCaches();
 
             Collection<ClusterNode> nodes;
 
@@ -611,7 +581,9 @@ public class GridReduceQueryExecutor {
 
                 boolean retry = false;
 
-                final boolean oldStyle = oldNodesInTopology();
+                IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer);
+
+                final boolean oldStyle = minNodeVer.compareToIgnoreTimestamp(DISTRIBUTED_JOIN_SINCE) < 0;
                 final boolean distributedJoins = qry.distributedJoins();
 
                 if (oldStyle && distributedJoins)
@@ -619,12 +591,18 @@ public class GridReduceQueryExecutor {
 
                 if (send(nodes,
                     oldStyle ?
-                        new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null) :
+                        new GridQueryRequest(qryReqId,
+                            r.pageSize,
+                            space,
+                            mapQrys,
+                            topVer,
+                            extraSpaces(space, qry.spaces()),
+                            null) :
                         new GridH2QueryRequest()
                             .requestId(qryReqId)
                             .topologyVersion(topVer)
                             .pageSize(r.pageSize)
-                            .caches(join(space, extraSpaces))
+                            .caches(qry.caches())
                             .tables(distributedJoins ? qry.tables() : null)
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
@@ -707,7 +685,7 @@ public class GridReduceQueryExecutor {
                             resIter = new Iter(res);
                         }
                         finally {
-                            GridH2QueryContext.clear(false);
+                            GridH2QueryContext.clearThreadLocal();
                         }
                     }
                 }
@@ -851,8 +829,8 @@ public class GridReduceQueryExecutor {
      * @param extraSpaces Extra spaces.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx,
+        List<Integer> extraSpaces) {
         assert cctx.isReplicated() : cctx.name() + " must be replicated";
 
         Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
@@ -861,15 +839,15 @@ public class GridReduceQueryExecutor {
             return null; // Retry.
 
         if (!F.isEmpty(extraSpaces)) {
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isLocal())
                     continue;
 
                 if (!extraCctx.isReplicated())
                     throw new CacheException("Queries running on replicated cache should not contain JOINs " +
-                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
+                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]");
 
                 Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
 
@@ -938,14 +916,14 @@ public class GridReduceQueryExecutor {
      */
     @SuppressWarnings("unchecked")
     private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
-        List<String> extraSpaces) {
+        List<Integer> extraSpaces) {
         assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
 
         final int partsCnt = cctx.affinity().partitions();
 
         if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
@@ -954,7 +932,7 @@ public class GridReduceQueryExecutor {
 
                 if (parts != partsCnt)
                     throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" +
-                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraSpace + ", parts2=" + parts + "]");
+                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]");
             }
         }
 
@@ -977,8 +955,8 @@ public class GridReduceQueryExecutor {
         if (extraSpaces != null) {
             // Find owner intersections for each participating partitioned cache partition.
             // We need this for logical collocation between different partitioned caches with the same affinity.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
@@ -987,10 +965,10 @@ public class GridReduceQueryExecutor {
                     List<ClusterNode> owners = extraCctx.topology().owners(p);
 
                     if (F.isEmpty(owners)) {
-                        if (!F.isEmpty(dataNodes(extraSpace, NONE)))
+                        if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
                             return null; // Retry.
 
-                        throw new CacheException("Failed to find data nodes [cache=" + extraSpace + ", part=" + p + "]");
+                        throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
                     }
 
                     if (partLocs[p] == null)
@@ -1005,8 +983,8 @@ public class GridReduceQueryExecutor {
             }
 
             // Filter nodes where not all the replicated caches loaded.
-            for (String extraSpace : extraSpaces) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+            for (int i = 0; i < extraSpaces.size(); i++) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
 
                 if (!extraCctx.isReplicated())
                     continue;
@@ -1137,7 +1115,14 @@ public class GridReduceQueryExecutor {
         if (log.isDebugEnabled())
             log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]");
 
-        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHnd, QUERY_POOL, runLocParallel);
+        return h2.send(GridTopic.TOPIC_QUERY,
+            GridTopic.TOPIC_QUERY.ordinal(),
+            nodes,
+            msg,
+            specialize,
+            locNodeHnd,
+            QUERY_POOL,
+            runLocParallel);
     }
 
     /**
@@ -1161,23 +1146,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param first First element.
-     * @param rest Other elements.
-     * @return New joined list.
-     */
-    private static <Z> List<Z> join(Z first, List<Z> rest) {
-        if (F.isEmpty(rest))
-            return Collections.singletonList(first);
-
-        List<Z> res = new ArrayList<>(rest.size() + 1);
-
-        res.add(first);
-        res.addAll(rest);
-
-        return res;
-    }
-
-    /**
      * @param ints Ints.
      * @return Array.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 3b200c5..dc82b2c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -55,8 +55,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** */
     @GridToStringInclude
-    @GridDirectCollection(String.class)
-    private List<String> caches;
+    @GridDirectCollection(Integer.class)
+    private List<Integer> caches;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -120,7 +120,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      * @param caches Caches.
      * @return {@code this}.
      */
-    public GridH2QueryRequest caches(List<String> caches) {
+    public GridH2QueryRequest caches(List<Integer> caches) {
         this.caches = caches;
 
         return this;
@@ -129,7 +129,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /**
      * @return Caches.
      */
-    public List<String> caches() {
+    public List<Integer> caches() {
         return caches;
     }
 
@@ -250,7 +250,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeCollection("caches", caches, MessageCollectionItemType.STRING))
+                if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
@@ -311,7 +311,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
         switch (reader.state()) {
             case 0:
-                caches = reader.readCollection("caches", MessageCollectionItemType.STRING);
+                caches = reader.readCollection("caches", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
index 2fa9f4c..f646ce2 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
@@ -206,7 +206,7 @@ public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryT
     /**
      * @throws Exception If failed.
      */
-    public void testJoin3Caches() throws Exception {
+    public void _testJoin3Caches() throws Exception {
         testJoin(3, MODES_1);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
index d27fe1b..44bca5e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinQueryWithAffinityKeyTest.java
@@ -63,6 +63,9 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
     /** */
     private boolean client;
 
+    /** */
+    private boolean escape;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -113,6 +116,15 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
     /**
      * @throws Exception If failed.
      */
+    public void testJoinQueryEscapeAll() throws Exception {
+        escape = true;
+
+        testJoinQuery();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testJoinQueryWithAffinityKey() throws Exception {
         testJoinQuery(PARTITIONED, 0, true, true);
 
@@ -124,6 +136,15 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
     /**
      * @throws Exception If failed.
      */
+    public void testJoinQueryWithAffinityKeyEscapeAll() throws Exception {
+        escape = true;
+
+        testJoinQueryWithAffinityKey();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testJoinQueryWithAffinityKeyNotQueryField() throws Exception {
         testJoinQuery(PARTITIONED, 0, true, false);
 
@@ -133,12 +154,21 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQueryWithAffinityKeyNotQueryFieldEscapeAll() throws Exception {
+        escape = true;
+
+        testJoinQueryWithAffinityKeyNotQueryField();
+    }
+
+    /**
      * @param cacheMode Cache mode.
      * @param backups Number of backups.
      * @param affKey If {@code true} uses key with affinity key field.
      * @param includeAffKey If {@code true} includes affinity key field in query fields.
      */
-    public void testJoinQuery(CacheMode cacheMode, int backups, final boolean affKey, boolean includeAffKey) {
+    private void testJoinQuery(CacheMode cacheMode, int backups, final boolean affKey, boolean includeAffKey) {
         CacheConfiguration ccfg = cacheConfiguration(cacheMode, backups, affKey, includeAffKey);
 
         log.info("Test cache [mode=" + cacheMode + ", backups=" + backups + ']');
@@ -179,9 +209,18 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
      * @param cnts Organizations per person counts.
      */
     private void checkOrganizationPersonsJoin(IgniteCache cache, Map<Integer, Integer> cnts) {
-        SqlFieldsQuery qry = new SqlFieldsQuery("select o.name, p.name " +
-            "from Organization o, Person p " +
-            "where p.orgId = o._key and o._key=?");
+        SqlFieldsQuery qry;
+
+        if (escape) {
+            qry = new SqlFieldsQuery("select o.\"name\", p.\"name\" " +
+                "from \"Organization\" o, \"Person\" p " +
+                "where p.\"orgId\" = o._key and o._key=?");
+        }
+        else {
+            qry = new SqlFieldsQuery("select o.name, p.name " +
+                "from Organization o, Person p " +
+                "where p.orgId = o._key and o._key=?");
+        }
 
         qry.setDistributedJoins(true);
 
@@ -197,8 +236,16 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
             total += res.size();
         }
 
-        SqlFieldsQuery qry2 = new SqlFieldsQuery("select count(*) " +
-            "from Organization o, Person p where p.orgId = o._key");
+        SqlFieldsQuery qry2;
+
+        if (escape) {
+            qry2 = new SqlFieldsQuery("select count(*) " +
+                "from \"Organization\" o, \"Person\" p where p.\"orgId\" = o._key");
+        }
+        else {
+            qry2 = new SqlFieldsQuery("select count(*) " +
+                "from Organization o, Person p where p.orgId = o._key");
+        }
 
         qry2.setDistributedJoins(true);
 
@@ -214,15 +261,33 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
      * @param affKey If {@code true} uses key with affinity key field.
      */
     private void checkPersonAccountsJoin(IgniteCache cache, Map<Object, Integer> cnts, boolean affKey) {
-        SqlFieldsQuery qry1 = new SqlFieldsQuery("select p.name " +
-            "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
-            "where p._key = a.personKey and p._key=?");
+        String sql1;
+
+        if (escape) {
+            sql1 = "select p.\"name\" from \"Person\" p, \"" + (affKey ? "AccountKeyWithAffinity" : "Account") + "\" a " +
+                "where p._key = a.\"personKey\" and p._key=?";
+        }
+        else {
+            sql1 = "select p.name from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
+                "where p._key = a.personKey and p._key=?";
+        }
+
+        SqlFieldsQuery qry1 = new SqlFieldsQuery(sql1);
 
         qry1.setDistributedJoins(true);
 
-        SqlFieldsQuery qry2 = new SqlFieldsQuery("select p.name " +
-            "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
-            "where p.id = a.personId and p.id=?");
+        String sql2;
+
+        if (escape) {
+            sql2 = "select p.\"name\" from \"Person\" p, \"" + (affKey ? "AccountKeyWithAffinity" : "Account") + "\" a " +
+                "where p.\"id\" = a.\"personId\" and p.\"id\"=?";
+        }
+        else {
+            sql2 = "select p.name from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
+                "where p.id = a.personId and p.id=?";
+        }
+
+        SqlFieldsQuery qry2 = new SqlFieldsQuery(sql2);
 
         qry2.setDistributedJoins(true);
 
@@ -252,13 +317,25 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
 
         SqlFieldsQuery[] qrys = new SqlFieldsQuery[2];
 
-        qrys[0] = new SqlFieldsQuery("select count(*) " +
-            "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
-            "where p.id = a.personId");
 
-        qrys[1] = new SqlFieldsQuery("select count(*) " +
-            "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
-            "where p._key = a.personKey");
+        if (escape) {
+            qrys[0] = new SqlFieldsQuery("select count(*) " +
+                "from \"Person\" p, \"" + (affKey ? "AccountKeyWithAffinity" : "Account") + "\" a " +
+                "where p.\"id\" = a.\"personId\"");
+
+            qrys[1] = new SqlFieldsQuery("select count(*) " +
+                "from \"Person\" p, \"" + (affKey ? "AccountKeyWithAffinity" : "Account") + "\" a " +
+                "where p._key = a.\"personKey\"");
+        }
+        else {
+            qrys[0] = new SqlFieldsQuery("select count(*) " +
+                "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
+                "where p.id = a.personId");
+
+            qrys[1] = new SqlFieldsQuery("select count(*) " +
+                "from Person p, " + (affKey ? "AccountKeyWithAffinity" : "Account") + " a " +
+                "where p._key = a.personKey");
+        }
 
         for (SqlFieldsQuery qry : qrys) {
             qry.setDistributedJoins(true);
@@ -318,6 +395,8 @@ public class IgniteCacheJoinQueryWithAffinityKeyTest extends GridCommonAbstractT
 
         ccfg.setQueryEntities(F.asList(account, person, org));
 
+        ccfg.setSqlEscapeAll(escape);
+
         return ccfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index de00a2b..0e6806f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -58,12 +58,24 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
         "group by co._key order by cnt desc, co._key";
 
     /** */
+    private static final String QRY_0_BROADCAST = "select co._key, count(*) cnt\n" +
+        "from \"co\".Company co, \"pr\".Product pr, \"pu\".Purchase pu, \"pe\".Person pe \n" +
+        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
+
+    /** */
     private static final String QRY_1 = "select pr._key, co._key\n" +
         "from \"pr\".Product pr, \"co\".Company co\n" +
         "where pr.companyId = co._key\n" +
         "order by co._key, pr._key ";
 
     /** */
+    private static final String QRY_1_BROADCAST = "select pr._key, co._key\n" +
+        "from \"co\".Company co, \"pr\".Product pr \n" +
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
+
+    /** */
     private static final int GRID_CNT = 6;
 
     /** */
@@ -145,6 +157,15 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
         return c;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        fillCaches();
+    }
+
     /**
      *
      */
@@ -175,36 +196,66 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
             pu.put(i, new Purchase(persId, prodId));
         }
     }
-
     /**
      * @throws Exception If failed.
      */
     public void testRestarts() throws Exception {
+        restarts(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartsBroadcast() throws Exception {
+        restarts(true);
+    }
+
+    /**
+     * @param broadcastQry If {@code true} tests broadcast query.
+     * @throws Exception If failed.
+     */
+    private void restarts(final boolean broadcastQry) throws Exception {
         int duration = 90 * 1000;
         int qryThreadNum = 4;
         int restartThreadsNum = 2; // 4 + 2 = 6 nodes
         final int nodeLifeTime = 4000;
-        final int logFreq = 1;
-
-        startGridsMultiThreaded(GRID_CNT);
+        final int logFreq = 100;
 
         final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT);
 
-        fillCaches();
+        SqlFieldsQuery qry0 ;
+
+        if (broadcastQry)
+            qry0 = new SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+        else
+            qry0 = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
 
-        X.println("Plan: " + grid(0).cache("pu").query(new SqlFieldsQuery("explain " + QRY_0)
-            .setDistributedJoins(true)).getAll());
+        String plan = queryPlan(grid(0).cache("pu"), qry0);
 
-        final List<List<?>> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(QRY_0)
-            .setDistributedJoins(true)).getAll();
+        X.println("Plan1: " + plan);
+
+        assertEquals(broadcastQry, plan.contains("batched:broadcast"));
+
+        final List<List<?>> pRes = grid(0).cache("pu").query(qry0).getAll();
 
         Thread.sleep(3000);
 
-        assertEquals(pRes, grid(0).cache("pu").query(new SqlFieldsQuery(QRY_0)
-            .setDistributedJoins(true)).getAll());
+        assertEquals(pRes, grid(0).cache("pu").query(qry0).getAll());
+
+        final SqlFieldsQuery qry1;
+
+        if (broadcastQry)
+            qry1 = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+        else
+            qry1 = new SqlFieldsQuery(QRY_1).setDistributedJoins(true);
+
+        plan = queryPlan(grid(0).cache("co"), qry1);
 
-        final List<List<?>> rRes = grid(0).cache("co").query(new SqlFieldsQuery(QRY_1)
-            .setDistributedJoins(true)).getAll();
+        X.println("Plan2: " + plan);
+
+        assertEquals(broadcastQry, plan.contains("batched:broadcast"));
+
+        final List<List<?>> rRes = grid(0).cache("co").query(qry1).getAll();
 
         assertFalse(pRes.isEmpty());
         assertFalse(rRes.isEmpty());
@@ -225,9 +276,14 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
                     while (!locks.compareAndSet(g, 0, 1));
 
                     if (rnd.nextBoolean()) {
-                        IgniteCache<?,?> cache = grid(g).cache("pu");
+                        IgniteCache<?, ?> cache = grid(g).cache("pu");
+
+                        SqlFieldsQuery qry;
 
-                        SqlFieldsQuery qry = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
+                        if (broadcastQry)
+                            qry = new SqlFieldsQuery(QRY_0_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+                        else
+                            qry = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
 
                         boolean smallPageSize = rnd.nextBoolean();
 
@@ -261,10 +317,16 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
                         }
                     }
                     else {
-                        IgniteCache<?,?> cache = grid(g).cache("co");
+                        IgniteCache<?, ?> cache = grid(g).cache("co");
+
+                        SqlFieldsQuery qry;
+
+                        if (broadcastQry)
+                            qry = new SqlFieldsQuery(QRY_1_BROADCAST).setDistributedJoins(true).setEnforceJoinOrder(true);
+                        else
+                            qry = new SqlFieldsQuery(QRY_1).setDistributedJoins(true);
 
-                        assertEquals(rRes, cache.query(new SqlFieldsQuery(QRY_1)
-                            .setDistributedJoins(true)).getAll());
+                        assertEquals(rRes, cache.query(qry1).getAll());
                     }
 
                     locks.set(g, 0);
@@ -340,9 +402,13 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
      *
      */
     private static class Person implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /**
+         * @param id ID.
+         */
         Person(int id) {
             this.id = id;
         }
@@ -352,12 +418,18 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
      *
      */
     private static class Purchase implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int personId;
 
+        /** */
         @QuerySqlField(index = true)
         int productId;
 
+        /**
+         * @param personId Person ID.
+         * @param productId Product ID.
+         */
         Purchase(int personId, int productId) {
             this.personId = personId;
             this.productId = productId;
@@ -368,9 +440,13 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
      *
      */
     private static class Company implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /**
+         * @param id ID.
+         */
         Company(int id) {
             this.id = id;
         }
@@ -380,12 +456,18 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
      *
      */
     private static class Product implements Serializable {
+        /** */
         @QuerySqlField(index = true)
         int id;
 
+        /** */
         @QuerySqlField(index = true)
         int companyId;
 
+        /**
+         * @param id ID.
+         * @param companyId Company ID.
+         */
         Product(int id, int companyId) {
             this.id = id;
             this.companyId = companyId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/129086a2/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 79bdd75..1854938 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -156,10 +156,6 @@ public class IgniteBenchmarkArguments {
     @Parameter(names = {"-ltops", "--allowedLoadTestOperations"}, variableArity = true, description = "List of enabled load test operations")
     private List<String> allowedLoadTestOps = new ArrayList<>();
 
-    /** */
-    @Parameter(names = {"-bcj", "--broadcastJoin"}, description = "Use broadcast distributed join")
-    private boolean broadcastJoin;
-
     /**
      * @return List of enabled load test operations.
      */
@@ -385,13 +381,6 @@ public class IgniteBenchmarkArguments {
     }
 
     /**
-     * @return {@code True} if should use broadcast for distributed join.
-     */
-    public boolean broadcastJoin() {
-        return broadcastJoin;
-    }
-
-    /**
      * @return Description.
      */
     public String description() {


Mime
View raw message