ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [17/50] [abbrv] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Thu, 28 Jul 2016 11:06:42 GMT
ignite-1232 Distributed SQL joins implementation


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68891e89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68891e89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68891e89

Branch: refs/heads/ignite-3553
Commit: 68891e89dd0e0f19321d6a4d45ae7372279b8b08
Parents: eddd0a9
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 22 17:07:58 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 22 17:08:03 2016 +0300

----------------------------------------------------------------------
 .../examples/datagrid/CacheQueryExample.java    |   78 +-
 .../ignite/codegen/MessageCodeGenerator.java    |    6 +
 .../ignite/cache/query/SqlFieldsQuery.java      |   53 +
 .../org/apache/ignite/cache/query/SqlQuery.java |   25 +
 .../managers/communication/GridIoManager.java   |   32 +-
 .../communication/GridIoMessageFactory.java     |    2 +-
 .../managers/communication/GridIoPolicy.java    |    5 +-
 .../cache/GridCacheAffinityManager.java         |    2 +-
 .../processors/cache/GridCacheContext.java      |   19 +
 .../GridCacheDefaultAffinityKeyMapper.java      |   19 +
 .../processors/cache/IgniteCacheProxy.java      |    8 +
 .../binary/CacheObjectBinaryProcessorImpl.java  |    8 +
 .../dht/GridDhtPartitionsReservation.java       |    3 +-
 .../cache/query/GridCacheQueryManager.java      |   47 +-
 .../cache/query/GridCacheQueryMarshallable.java |   37 +
 .../cache/query/GridCacheSqlQuery.java          |   33 +-
 .../cache/query/GridCacheTwoStepQuery.java      |  123 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |    6 +
 .../IgniteCacheObjectProcessorImpl.java         |    5 +
 .../processors/closure/GridClosurePolicy.java   |   51 -
 .../closure/GridClosureProcessor.java           |   52 +-
 .../processors/query/GridQueryIndexing.java     |   51 +-
 .../processors/query/GridQueryProcessor.java    |  192 +-
 .../query/GridQueryTypeDescriptor.java          |    7 +
 .../messages/GridQueryCancelRequest.java        |    2 +-
 .../twostep/messages/GridQueryFailResponse.java |    2 +-
 .../messages/GridQueryNextPageRequest.java      |    2 +-
 .../messages/GridQueryNextPageResponse.java     |   12 +-
 .../h2/twostep/messages/GridQueryRequest.java   |   28 +-
 .../ignite/internal/util/IgniteUtils.java       |   13 +
 .../ignite/internal/util/lang/GridFunc.java     |   20 +
 .../offheap/unsafe/GridOffHeapSnapTreeMap.java  |   91 +-
 .../jdbc/TcpDiscoveryJdbcIpFinderSelfTest.java  |    2 +-
 .../junits/common/GridCommonAbstractTest.java   |   39 +-
 .../query/h2/opt/GridH2SpatialIndex.java        |   74 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  709 ++++++--
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |   72 +-
 .../query/h2/opt/GridH2CollocationModel.java    |  783 +++++++++
 .../processors/query/h2/opt/GridH2Cursor.java   |   36 +-
 .../query/h2/opt/GridH2DefaultTableEngine.java  |   38 +
 .../query/h2/opt/GridH2IndexBase.java           | 1392 ++++++++++++++-
 .../query/h2/opt/GridH2MetaTable.java           |  383 ++++
 .../query/h2/opt/GridH2QueryContext.java        |  612 +++++++
 .../query/h2/opt/GridH2QueryType.java           |   49 +
 .../query/h2/opt/GridH2RetryException.java      |   32 +
 .../processors/query/h2/opt/GridH2Row.java      |   86 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   28 +-
 .../query/h2/opt/GridH2RowFactory.java          |  179 ++
 .../processors/query/h2/opt/GridH2Table.java    |  372 ++--
 .../query/h2/opt/GridH2TreeIndex.java           |  142 +-
 .../processors/query/h2/opt/GridH2Utils.java    |  133 --
 .../query/h2/opt/GridH2ValueCacheObject.java    |    3 +-
 .../query/h2/opt/GridLuceneIndex.java           |    7 +-
 .../processors/query/h2/sql/GridSqlAlias.java   |   12 +
 .../processors/query/h2/sql/GridSqlColumn.java  |   22 +-
 .../processors/query/h2/sql/GridSqlElement.java |   11 +
 .../query/h2/sql/GridSqlOperation.java          |    2 +-
 .../query/h2/sql/GridSqlOperationType.java      |    8 +-
 .../query/h2/sql/GridSqlQueryParser.java        |   97 +-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  293 +++-
 .../processors/query/h2/sql/GridSqlSelect.java  |    9 +-
 .../processors/query/h2/sql/GridSqlTable.java   |   70 +
 .../query/h2/twostep/GridMapQueryExecutor.java  |  415 +++--
 .../query/h2/twostep/GridMergeIndex.java        |   75 +-
 .../h2/twostep/GridMergeIndexUnsorted.java      |    6 +-
 .../query/h2/twostep/GridMergeTable.java        |    4 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  338 ++--
 .../query/h2/twostep/GridThreadLocalTable.java  |   68 +-
 .../query/h2/twostep/msg/GridH2Array.java       |    9 +-
 .../query/h2/twostep/msg/GridH2Boolean.java     |   10 +-
 .../query/h2/twostep/msg/GridH2Byte.java        |    9 +-
 .../query/h2/twostep/msg/GridH2Bytes.java       |   11 +-
 .../query/h2/twostep/msg/GridH2CacheObject.java |    9 +-
 .../query/h2/twostep/msg/GridH2Date.java        |    9 +-
 .../query/h2/twostep/msg/GridH2Decimal.java     |   11 +-
 .../query/h2/twostep/msg/GridH2Double.java      |    9 +-
 .../query/h2/twostep/msg/GridH2Float.java       |    9 +-
 .../query/h2/twostep/msg/GridH2Geometry.java    |   11 +-
 .../h2/twostep/msg/GridH2IndexRangeRequest.java |  208 +++
 .../twostep/msg/GridH2IndexRangeResponse.java   |  279 +++
 .../query/h2/twostep/msg/GridH2Integer.java     |   20 +-
 .../query/h2/twostep/msg/GridH2JavaObject.java  |   11 +-
 .../query/h2/twostep/msg/GridH2Long.java        |    9 +-
 .../query/h2/twostep/msg/GridH2Null.java        |   15 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |  401 +++++
 .../query/h2/twostep/msg/GridH2RowMessage.java  |  116 ++
 .../query/h2/twostep/msg/GridH2RowRange.java    |  181 ++
 .../h2/twostep/msg/GridH2RowRangeBounds.java    |  188 ++
 .../query/h2/twostep/msg/GridH2Short.java       |    9 +-
 .../query/h2/twostep/msg/GridH2String.java      |    9 +-
 .../query/h2/twostep/msg/GridH2Time.java        |    9 +-
 .../query/h2/twostep/msg/GridH2Timestamp.java   |   11 +-
 .../query/h2/twostep/msg/GridH2Uuid.java        |    9 +-
 .../h2/twostep/msg/GridH2ValueMessage.java      |    2 +-
 .../twostep/msg/GridH2ValueMessageFactory.java  |   22 +-
 ...idCacheReduceQueryMultithreadedSelfTest.java |  168 --
 .../cache/IgniteCacheAbstractQuerySelfTest.java |   30 +-
 .../IgniteCacheCrossCacheJoinRandomTest.java    |  442 +++++
 ...acheDistributedJoinCollocatedAndNotTest.java |  365 ++++
 ...acheDistributedJoinCustomAffinityMapper.java |  262 +++
 .../IgniteCacheDistributedJoinNoIndexTest.java  |  299 ++++
 ...ributedJoinPartitionedAndReplicatedTest.java |  487 ++++++
 ...CacheDistributedJoinQueryConditionsTest.java |  624 +++++++
 .../cache/IgniteCacheDistributedJoinTest.java   |  316 ++++
 ...PartitionedAndReplicatedCollocationTest.java |  399 +++++
 ...teCacheJoinPartitionedAndReplicatedTest.java |   78 +-
 ...IgniteCacheJoinQueryWithAffinityKeyTest.java |  646 +++++++
 .../cache/IgniteCacheQueryLoadSelfTest.java     |   12 +-
 .../cache/IgniteCrossCachesJoinsQueryTest.java  | 1641 ++++++++++++++++++
 ...QueryNodeRestartDistributedJoinSelfTest.java |  476 +++++
 ...dCacheAbstractReduceFieldsQuerySelfTest.java |  420 -----
 ...ridCacheReduceFieldsQueryAtomicSelfTest.java |   38 -
 ...GridCacheReduceFieldsQueryLocalSelfTest.java |   37 -
 ...cheReduceFieldsQueryPartitionedSelfTest.java |   59 -
 ...acheReduceFieldsQueryReplicatedSelfTest.java |   37 -
 .../query/IgniteSqlSchemaIndexingTest.java      |    5 +-
 .../query/IgniteSqlSplitterSelfTest.java        |  833 ++++++++-
 .../h2/GridIndexingSpiAbstractSelfTest.java     |  145 +-
 .../query/h2/opt/GridH2TableSelfTest.java       |   10 +-
 .../h2/sql/AbstractH2CompareQueryTest.java      |  163 +-
 .../query/h2/sql/GridQueryParsingTest.java      |   17 +-
 .../H2CompareBigQueryDistributedJoinsTest.java  |   28 +
 .../query/h2/sql/H2CompareBigQueryTest.java     |  119 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   25 +-
 .../IgniteCacheQuerySelfTestSuite2.java         |   10 -
 .../ignite/yardstick/IgniteBenchmarkUtils.java  |   45 +
 .../IgniteSqlQueryDistributedJoinBenchmark.java |  184 ++
 ...lQueryDistributedJoinBroadcastBenchmark.java |   28 +
 parent/pom.xml                                  |   24 +-
 129 files changed, 15201 insertions(+), 2502 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
index 9200489..85d74e0 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheQueryExample.java
@@ -46,8 +46,11 @@ import org.apache.ignite.lang.IgniteBiPredicate;
  * limitations (not applied if data is queried from one node only):
  * <ul>
  *     <li>
- *         Joins will work correctly only if joined objects are stored in
+ *         Non-distributed joins will work correctly only if joined objects are stored in
  *         collocated mode. Refer to {@link AffinityKey} javadoc for more details.
+ *         <p>
+ *         To use distributed joins it is necessary to set query 'distributedJoin' flag using
+ *         {@link SqlFieldsQuery#setDistributedJoins(boolean)} or {@link SqlQuery#setDistributedJoins(boolean)}.
  *     </li>
  *     <li>
  *         Note that if you created query on to replicated cache, all data will
@@ -65,6 +68,9 @@ public class CacheQueryExample {
     /** Organizations cache name. */
     private static final String ORG_CACHE = CacheQueryExample.class.getSimpleName() + "Organizations";
 
+    /** Persons collocated with Organizations cache name. */
+    private static final String COLLOCATED_PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "CollocatedPersons";
+
     /** Persons cache name. */
     private static final String PERSON_CACHE = CacheQueryExample.class.getSimpleName() + "Persons";
 
@@ -84,14 +90,20 @@ public class CacheQueryExample {
             orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
             orgCacheCfg.setIndexedTypes(Long.class, Organization.class);
 
+            CacheConfiguration<AffinityKey<Long>, Person> colPersonCacheCfg = new CacheConfiguration<>(COLLOCATED_PERSON_CACHE);
+
+            colPersonCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
+            colPersonCacheCfg.setIndexedTypes(AffinityKey.class, Person.class);
+
             CacheConfiguration<AffinityKey<Long>, Person> personCacheCfg = new CacheConfiguration<>(PERSON_CACHE);
 
             personCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
-            personCacheCfg.setIndexedTypes(AffinityKey.class, Person.class);
+            personCacheCfg.setIndexedTypes(Long.class, Person.class);
 
             // Auto-close cache at the end of the example.
             try (
                 IgniteCache<Long, Organization> orgCache = ignite.getOrCreateCache(orgCacheCfg);
+                IgniteCache<AffinityKey<Long>, Person> colPersonCache = ignite.getOrCreateCache(colPersonCacheCfg);
                 IgniteCache<AffinityKey<Long>, Person> personCache = ignite.getOrCreateCache(personCacheCfg)
             ) {
                 // Populate cache.
@@ -103,9 +115,12 @@ public class CacheQueryExample {
                 // Example for SQL-based querying employees based on salary ranges.
                 sqlQuery();
 
-                // Example for SQL-based querying employees for a given organization (includes SQL join).
+                // Example for SQL-based querying employees for a given organization (includes SQL join for collocated objects).
                 sqlQueryWithJoin();
 
+                // Example for SQL-based querying employees for a given organization (includes distributed SQL join).
+                sqlQueryWithDistributedJoin();
+
                 // Example for TEXT-based querying for a given string in peoples resumes.
                 textQuery();
 
@@ -121,6 +136,7 @@ public class CacheQueryExample {
             }
             finally {
                 // Distributed cache could be removed from cluster only by #destroyCache() call.
+                ignite.destroyCache(COLLOCATED_PERSON_CACHE);
                 ignite.destroyCache(PERSON_CACHE);
                 ignite.destroyCache(ORG_CACHE);
             }
@@ -133,7 +149,7 @@ public class CacheQueryExample {
      * Example for scan query based on a predicate.
      */
     private static void scanQuery() {
-        IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(PERSON_CACHE).withKeepBinary();
+        IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE).withKeepBinary();
 
         ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>(
             new IgniteBiPredicate<BinaryObject, BinaryObject>() {
@@ -170,7 +186,7 @@ public class CacheQueryExample {
      * Example for SQL queries based on all employees working for a specific organization.
      */
     private static void sqlQueryWithJoin() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // SQL clause query which joins on 2 types to select people for a specific organization.
         String joinSql =
@@ -189,6 +205,32 @@ public class CacheQueryExample {
     }
 
     /**
+     * Example for SQL queries based on all employees working for a specific organization (query uses distributed join).
+     */
+    private static void sqlQueryWithDistributedJoin() {
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+
+        // SQL clause query which joins on 2 types to select people for a specific organization.
+        String joinSql =
+            "from Person, \"" + ORG_CACHE + "\".Organization as org " +
+            "where Person.orgId = org.id " +
+            "and lower(org.name) = lower(?)";
+
+        SqlQuery qry = new SqlQuery<AffinityKey<Long>, Person>(Person.class, joinSql).
+            setArgs("ApacheIgnite");
+
+        // Enable distributed joins for query.
+        qry.setDistributedJoins(true);
+
+        // Execute queries for find employees for different organizations.
+        print("Following people are 'ApacheIgnite' employees (distributed join): ", cache.query(qry).getAll());
+
+        qry.setArgs("Other");
+
+        print("Following people are 'Other' employees (distributed join): ", cache.query(qry).getAll());
+    }
+
+    /**
      * Example for TEXT queries using LUCENE-based indexing of people's resumes.
      */
     private static void textQuery() {
@@ -210,7 +252,7 @@ public class CacheQueryExample {
      * Example for SQL queries to calculate average salary for a specific organization.
      */
     private static void sqlQueryWithAggregation() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // Calculate average of salary of all persons in ApacheIgnite.
         // Note that we also join on Organization cache as well.
@@ -249,7 +291,7 @@ public class CacheQueryExample {
      * fields instead of whole key-value pairs.
      */
     private static void sqlFieldsQueryWithJoin() {
-        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> cache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
 
         // Execute query to get names of all employees.
         String sql =
@@ -263,7 +305,7 @@ public class CacheQueryExample {
         List<List<?>> res = cursor.getAll();
 
         // Print persons' names and organizations' names.
-        print("Names of all employees and organizations they belong to:", res);
+        print("Names of all employees and organizations they belong to: ", res);
     }
 
     /**
@@ -282,9 +324,11 @@ public class CacheQueryExample {
         orgCache.put(org1.id(), org1);
         orgCache.put(org2.id(), org2);
 
-        IgniteCache<AffinityKey<Long>, Person> personCache = Ignition.ignite().cache(PERSON_CACHE);
+        IgniteCache<AffinityKey<Long>, Person> colPersonCache = Ignition.ignite().cache(COLLOCATED_PERSON_CACHE);
+        IgniteCache<Long, Person> personCache = Ignition.ignite().cache(PERSON_CACHE);
 
-        // Clear cache before running the example.
+        // Clear caches before running the example.
+        colPersonCache.clear();
         personCache.clear();
 
         // People.
@@ -295,10 +339,16 @@ public class CacheQueryExample {
 
         // Note that in this example we use custom affinity key for Person objects
         // to ensure that all persons are collocated with their organizations.
-        personCache.put(p1.key(), p1);
-        personCache.put(p2.key(), p2);
-        personCache.put(p3.key(), p3);
-        personCache.put(p4.key(), p4);
+        colPersonCache.put(p1.key(), p1);
+        colPersonCache.put(p2.key(), p2);
+        colPersonCache.put(p3.key(), p3);
+        colPersonCache.put(p4.key(), p4);
+
+        // These Person objects are not collocated with their organizations.
+        personCache.put(p1.id, p1);
+        personCache.put(p2.id, p2);
+        personCache.put(p3.id, p3);
+        personCache.put(p4.id, p4);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index a6ae0da..f0ba5ce 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -223,6 +223,12 @@ public class MessageCodeGenerator {
 //        gen.generateAndWrite(GridH2Uuid.class);
 //        gen.generateAndWrite(GridH2Geometry.class);
 //        gen.generateAndWrite(GridH2CacheObject.class);
+//        gen.generateAndWrite(GridH2IndexRangeRequest.class);
+//        gen.generateAndWrite(GridH2IndexRangeResponse.class);
+//        gen.generateAndWrite(GridH2RowRange.class);
+//        gen.generateAndWrite(GridH2RowRangeBounds.class);
+//        gen.generateAndWrite(GridH2QueryRequest.class);
+//        gen.generateAndWrite(GridH2RowMessage.class);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index b2dd181..48dab6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -56,6 +56,12 @@ public final class SqlFieldsQuery extends Query<List<?>> {
     /** Collocation flag. */
     private boolean collocated;
 
+    /** */
+    private boolean enforceJoinOrder;
+
+    /** */
+    private boolean distributedJoins;
+
     /**
      * Constructs SQL fields query.
      *
@@ -141,6 +147,53 @@ public final class SqlFieldsQuery extends Query<List<?>> {
         return this;
     }
 
+    /**
+     * Checks if join order of tables if enforced.
+     *
+     * @return Flag value.
+     */
+    public boolean isEnforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * Sets flag to enforce join order of tables in the query. If set to {@code true}
+     * query optimizer will not reorder tables in join. By default is {@code false}.
+     * <p>
+     * It is not recommended to enable this property until you are sure that
+     * your indexes and the query itself are correct and tuned as much as possible but
+     * query optimizer still produces wrong join order.
+     *
+     * @param enforceJoinOrder Flag value.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setEnforceJoinOrder(boolean enforceJoinOrder) {
+        this.enforceJoinOrder = enforceJoinOrder;
+
+        return this;
+    }
+
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setDistributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+
+        return this;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlFieldsQuery setPageSize(int pageSize) {
         return (SqlFieldsQuery)super.setPageSize(pageSize);

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index be3b390..e05ff13 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -43,6 +43,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     @GridToStringInclude
     private Object[] args;
 
+    /** */
+    private boolean distributedJoins;
+
     /**
      * Constructs query for the given type name and SQL query.
      *
@@ -142,11 +145,33 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
 
     /**
      * @param type Type.
+     * @return {@code this} For chaining.
      */
     public SqlQuery setType(Class<?> type) {
         return setType(GridQueryProcessor.typeName(type));
     }
 
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     * @return {@code this} For chaining.
+     */
+    public SqlQuery setDistributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+
+        return this;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 4bc2eea..99cb7f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -88,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
@@ -157,6 +158,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** IGFS pool. */
     private ExecutorService igfsPool;
 
+    /** Index pool. */
+    private ExecutorService idxPool;
+
     /** Discovery listener. */
     private GridLocalEventListener discoLsnr;
 
@@ -268,6 +272,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             0,
             new LinkedBlockingQueue<Runnable>());
 
+        if (IgniteComponentType.INDEXING.inClassPath()) {
+            int cpus = Runtime.getRuntime().availableProcessors();
+
+            idxPool = new IgniteThreadPoolExecutor("idx", ctx.gridName(),
+                cpus, cpus * 2, 3000L, new LinkedBlockingQueue<Runnable>(1000));
+        }
+
         getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
             @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
                 try {
@@ -651,6 +662,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 case AFFINITY_POOL:
                 case UTILITY_CACHE_POOL:
                 case MARSH_CACHE_POOL:
+                case IDX_POOL:
                 case IGFS_POOL:
                 {
                     if (msg.isOrdered())
@@ -689,7 +701,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @return Execution pool.
      * @throws IgniteCheckedException If failed.
      */
-    private Executor pool(byte plc) throws IgniteCheckedException {
+    public Executor pool(byte plc) throws IgniteCheckedException {
         switch (plc) {
             case P2P_POOL:
                 return p2pPool;
@@ -717,6 +729,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                 return igfsPool;
 
+            case IDX_POOL:
+                assert idxPool != null : "Indexing pool is not configured.";
+
+                return idxPool;
+
             default: {
                 assert plc >= 0 : "Negative policy: " + plc;
 
@@ -1357,6 +1374,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @param node Destination node.
      * @param topic Topic to send the message to.
+     * @param topicOrd GridTopic enumeration ordinal.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void send(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc)
+        throws IgniteCheckedException {
+        send(node, topic, topicOrd, msg, plc, false, 0, false, null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
      * @param msg Message to send.
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5f60215..1eebfd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -752,7 +752,7 @@ public class GridIoMessageFactory implements MessageFactory {
 
             // [-3..119] [124] - this
             // [120..123] - DR
-            // [-4..-22] - SQL
+            // [-4..-22, -30..-35] - SQL
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 00590ba..70a7354 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -43,9 +43,12 @@ public class GridIoPolicy {
     /** Marshaller cache execution pool. */
     public static final byte MARSH_CACHE_POOL = 6;
 
-    /** Marshaller cache execution pool. */
+    /** IGFS pool. */
     public static final byte IGFS_POOL = 7;
 
+    /** Pool for handling distributed index range requests. */
+    public static final byte IDX_POOL = 8;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 8633333..e0e8031 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -224,7 +224,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      * @param key Key.
      * @return Affinity key.
      */
-    private Object affinityKey(Object key) {
+    public Object affinityKey(Object key) {
         if (key instanceof CacheObject && !(key instanceof BinaryObject))
             key = ((CacheObject)key).value(cctx.cacheObjectContext(), false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 36d9104..fec43d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -245,6 +245,9 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private boolean deferredDel;
 
+    /** */
+    private boolean customAffMapper;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -365,6 +368,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if custom {@link AffinityKeyMapper} is configured for cache.
+     */
+    public boolean customAffinityMapper() {
+        return customAffMapper;
+    }
+
+    /**
      * @param dynamicDeploymentId Dynamic deployment ID.
      */
     void dynamicDeploymentId(IgniteUuid dynamicDeploymentId) {
@@ -567,6 +577,13 @@ public class GridCacheContext<K, V> implements Externalizable {
     }
 
     /**
+     * @return {@code True} if cache is partitioned cache.
+     */
+    public boolean isPartitioned() {
+        return cacheCfg.getCacheMode() == CacheMode.PARTITIONED;
+    }
+
+    /**
      * @return {@code True} in case replication is enabled.
      */
     public boolean isDrEnabled() {
@@ -1132,6 +1149,8 @@ public class GridCacheContext<K, V> implements Externalizable {
      */
     public void cacheObjectContext(CacheObjectContext cacheObjCtx) {
         this.cacheObjCtx = cacheObjCtx;
+
+        customAffMapper = cacheCfg.getAffinityMapper().getClass() != cacheObjCtx.defaultAffMapper().getClass();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
index 5422bbd..4a2f039 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDefaultAffinityKeyMapper.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Default key affinity mapper. If key class has annotation {@link AffinityKeyMapped},
@@ -120,6 +121,24 @@ public class GridCacheDefaultAffinityKeyMapper implements AffinityKeyMapper {
     }
 
     /**
+     * @param cls Key class.
+     * @return Name of
+     */
+    @Nullable public String affinityKeyPropertyName(Class<?> cls) {
+        Field field = reflectCache.firstField(cls);
+
+        if (field != null)
+            return field.getName();
+
+        Method mtd = reflectCache.firstMethod(cls);
+
+        if (mtd != null)
+            return mtd.getName();
+
+        return null;
+    }
+
+    /**
      * @param ignite Ignite.
      */
     @IgniteInstanceResource

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 0d7bc6a..0005530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -670,6 +670,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                     opCtxCall != null && opCtxCall.isKeepBinary());
 
             if (qry instanceof SqlQuery) {
+                if (isReplicatedDataNode() && ((SqlQuery)qry).isDistributedJoins())
+                    throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+                        "not on replicated.");
+
                 final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
@@ -684,6 +688,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
             }
 
             if (qry instanceof SqlFieldsQuery) {
+                if (isReplicatedDataNode() && ((SqlFieldsQuery)qry).isDistributedJoins())
+                    throw new CacheException("Queries using distributed JOINs have to be run on partitioned cache, " +
+                        "not on replicated.");
+
                 SqlFieldsQuery p = (SqlFieldsQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 8400594..f4d1d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -516,6 +516,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
+    @Override public String affinityField(String keyType) {
+        if (binaryCtx == null)
+            return null;
+
+        return binaryCtx.affinityKeyFieldName(typeId(keyType));
+    }
+
+    /** {@inheritDoc} */
     @Override public BinaryObjectBuilder builder(String clsName) {
         return new BinaryObjectBuilderImpl(binaryCtx, clsName);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index d12247e..2f51c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -204,7 +204,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
 
             if (reservations.compareAndSet(r, r - 1)) {
                 // If it was the last reservation and topology version changed -> attempt to evict partitions.
-                if (r == 1 && !topVer.equals(cctx.topology().topologyVersion()))
+                if (r == 1 && !cctx.kernalContext().isStopping() &&
+                    !topVer.equals(cctx.topology().topologyVersion()))
                     tryEvict(parts.get());
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 4367ee9..60e81e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -75,7 +75,6 @@ import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicat
 import org.apache.ignite.internal.processors.datastructures.SetItemKey;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexType;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -270,25 +269,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     }
 
     /**
-     * Gets number of objects of given type in index.
-     *
-     * @param valType Value type.
-     * @return Number of objects or -1 if type was not indexed at all.
-     * @throws IgniteCheckedException If failed.
-     */
-    public long size(Class<?> valType) throws IgniteCheckedException {
-        if (!enterBusy())
-            throw new IllegalStateException("Failed to get size (grid is stopping).");
-
-        try {
-            return qryProc.size(space, valType);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
      * Rebuilds all search indexes of given value type.
      *
      * @param typeName Value type name.
@@ -586,26 +566,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         try {
             switch (qry.type()) {
                 case SQL:
-                    if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                        cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
-                            cctx.localNode(),
-                            "SQL query executed.",
-                            EVT_CACHE_QUERY_EXECUTED,
-                            CacheQueryType.SQL.name(),
-                            cctx.namex(),
-                            qry.queryClassName(),
-                            qry.clause(),
-                            null,
-                            null,
-                            args,
-                            subjId,
-                            taskName));
-                    }
-
-                    iter = qryProc.query(space, qry.clause(), F.asList(args),
-                        qry.queryClassName(), filter(qry));
-
-                    break;
+                    throw new IllegalStateException("Should never be called.");
 
                 case SCAN:
                     if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -764,11 +725,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             else {
                 assert qry.type() == SQL_FIELDS;
 
-                GridQueryFieldsResult qryRes = qryProc.queryFields(space, qry.clause(), F.asList(args), filter(qry));
-
-                res.metaData(qryRes.metaData());
-
-                res.onDone(qryRes.iterator());
+                throw new IllegalStateException("Should never be called.");
             }
         }
         catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java
new file mode 100644
index 0000000..d87936a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryMarshallable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.marshaller.Marshaller;
+
+/**
+ * Message which needs to be marshalled and unmarshalled before sending or processing it.
+ */
+public interface GridCacheQueryMarshallable {
+    /**
+     * @param m Marshaller.
+     */
+    public void marshall(Marshaller m);
+
+    /**
+     * @param m Marshaller.
+     * @param ctx Context.
+     */
+    public void unmarshall(Marshaller m, GridKernalContext ctx);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 0733827..6b81ed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query;
 import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -35,7 +36,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Query.
  */
-public class GridCacheSqlQuery implements Message {
+public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -130,30 +131,34 @@ public class GridCacheSqlQuery implements Message {
         return params;
     }
 
-    /**
-     * @param m Marshaller.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void marshallParams(Marshaller m) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public void marshall(Marshaller m) {
         if (paramsBytes != null)
             return;
 
         assert params != null;
 
-        paramsBytes = m.marshal(params);
+        try {
+            paramsBytes = m.marshal(params);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
-    /**
-     * @param m Marshaller.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void unmarshallParams(Marshaller m, GridKernalContext ctx) throws IgniteCheckedException {
+    /** {@inheritDoc} */
+    @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
         if (params != null)
             return;
 
         assert paramsBytes != null;
 
-        params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config()));
+        try {
+            params = m.unmarshal(paramsBytes, U.resolveClassLoader(ctx.config()));
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -271,4 +276,4 @@ public class GridCacheSqlQuery implements Message {
 
         return cp;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index da59c18..8dcba2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -45,24 +46,54 @@ public class GridCacheTwoStepQuery {
     private boolean explain;
 
     /** */
-    private Set<String> spaces;
+    private Collection<String> spaces;
 
     /** */
-    private final boolean skipMergeTbl;
+    private Set<String> schemas;
+
+    /** */
+    private Set<String> tbls;
+
+    /** */
+    private boolean distributedJoins;
+
+    /** */
+    private boolean skipMergeTbl;
+
+    /** */
+    private List<Integer> caches;
+
+    /** */
+    private List<Integer> extraCaches;
 
     /**
-     * @param spaces All spaces accessed in query.
-     * @param rdc Reduce query.
-     * @param skipMergeTbl {@code True} if reduce query can skip merge table creation and
-     *      get data directly from merge index.
+     * @param schemas Schema names in query.
+     * @param tbls Tables in query.
      */
-    public GridCacheTwoStepQuery(Set<String> spaces, GridCacheSqlQuery rdc, boolean skipMergeTbl) {
-        assert rdc != null;
+    public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+        this.schemas = schemas;
+        this.tbls = tbls;
+    }
 
-        this.spaces = spaces;
-        this.rdc = rdc;
-        this.skipMergeTbl = skipMergeTbl;
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     */
+    public void distributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
     }
+
+
     /**
      * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
      */
@@ -71,6 +102,13 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @param skipMergeTbl Skip merge table.
+     */
+    public void skipMergeTable(boolean skipMergeTbl) {
+        this.skipMergeTbl = skipMergeTbl;
+    }
+
+    /**
      * @return If this is explain query.
      */
     public boolean explain() {
@@ -116,6 +154,13 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @param rdc Reduce query.
+     */
+    public void reduceQuery(GridCacheSqlQuery rdc) {
+        this.rdc = rdc;
+    }
+
+    /**
      * @return Map queries.
      */
     public List<GridCacheSqlQuery> mapQueries() {
@@ -123,34 +168,84 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Caches.
+     */
+    public List<Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @param caches Caches.
+     */
+    public void caches(List<Integer> caches) {
+        this.caches = caches;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> extraCaches() {
+        return extraCaches;
+    }
+
+    /**
+     * @param extraCaches Caches.
+     */
+    public void extraCaches(List<Integer> extraCaches) {
+        this.extraCaches = extraCaches;
+    }
+
+    /**
      * @return Spaces.
      */
-    public Set<String> spaces() {
+    public Collection<String> spaces() {
         return spaces;
     }
 
     /**
      * @param spaces Spaces.
      */
-    public void spaces(Set<String> spaces) {
+    public void spaces(Collection<String> spaces) {
         this.spaces = spaces;
     }
 
     /**
+     * @return Schemas.
+     */
+    public Set<String> schemas() {
+        return schemas;
+    }
+
+    /**
      * @param args New arguments to copy with.
      * @return Copy.
      */
     public GridCacheTwoStepQuery copy(Object[] args) {
         assert !explain;
 
-        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(spaces, rdc.copy(args), skipMergeTbl);
+        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+
+        cp.caches = caches;
+        cp.extraCaches = extraCaches;
+        cp.spaces = spaces;
+        cp.rdc = rdc.copy(args);
+        cp.skipMergeTbl = skipMergeTbl;
         cp.pageSize = pageSize;
+        cp.distributedJoins = distributedJoins;
+
         for (int i = 0; i < mapQrys.size(); i++)
             cp.mapQrys.add(mapQrys.get(i).copy(args));
 
         return cp;
     }
 
+    /**
+     * @return Tables.
+     */
+    public Set<String> tables() {
+        return tbls;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index b8ac301..c488b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -187,4 +187,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      * @return Ignite binary interface.
      */
     public IgniteBinary binary();
+
+    /**
+     * @param keyType Key type name.
+     * @return Affinity filed name or {@code null}.
+     */
+    public String affinityField(String keyType);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 3203548..f62ce36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -83,6 +83,11 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
+    @Override public String affinityField(String keyType) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteBinary binary() {
         return noOpBinary;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
deleted file mode 100644
index c17cedd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosurePolicy.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.closure;
-
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This enumeration defines different types of closure
- * processing by the closure processor.
- */
-public enum GridClosurePolicy {
-    /** Public execution pool. */
-    PUBLIC_POOL,
-
-    /** P2P execution pool. */
-    P2P_POOL,
-
-    /** System execution pool. */
-    SYSTEM_POOL,
-
-    /** IGFS pool. */
-    IGFS_POOL;
-
-    /** Enum values. */
-    private static final GridClosurePolicy[] VALS = values();
-
-    /**
-     * Efficiently gets enumerated value from its ordinal.
-     *
-     * @param ord Ordinal value.
-     * @return Enumerated value.
-     */
-    @Nullable public static GridClosurePolicy fromOrdinal(int ord) {
-        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f9b74c4..a4559c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -77,6 +77,9 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
 import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
 
@@ -87,15 +90,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /** Ignite version in which binarylizable versions of closures were introduced. */
     public static final IgniteProductVersion BINARYLIZABLE_CLOSURES_SINCE = IgniteProductVersion.fromString("1.6.0");
 
-    /** */
-    private final Executor sysPool;
-
-    /** */
-    private final Executor pubPool;
-
-    /** */
-    private final Executor igfsPool;
-
     /** Lock to control execution after stop. */
     private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
 
@@ -107,10 +101,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      */
     public GridClosureProcessor(GridKernalContext ctx) {
         super(ctx);
-
-        sysPool = ctx.getSystemExecutorService();
-        pubPool = ctx.getExecutorService();
-        igfsPool = ctx.getIgfsExecutorService();
     }
 
     /** {@inheritDoc} */
@@ -727,20 +717,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Whether to get system or public pool.
      * @return Requested worker pool.
      */
-    private Executor pool(GridClosurePolicy plc) {
-        switch (plc) {
-            case PUBLIC_POOL:
-                return pubPool;
-
-            case SYSTEM_POOL:
-                return sysPool;
-
-            case IGFS_POOL:
-                return igfsPool;
-
-            default:
-                throw new IllegalArgumentException("Invalid closure execution policy: " + plc);
-        }
+    private Executor pool(byte plc) throws IgniteCheckedException {
+        return ctx.io().pool(plc);
     }
 
     /**
@@ -749,7 +727,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Pool name.
      */
-    private String poolName(GridClosurePolicy plc) {
+    private String poolName(byte plc) {
         switch (plc) {
             case PUBLIC_POOL:
                 return "public";
@@ -761,7 +739,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
                 return "igfs";
 
             default:
-                throw new IllegalArgumentException("Invalid closure execution policy: " + plc);
+                return "unknown";
         }
     }
 
@@ -772,7 +750,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException {
-        return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
+        return runLocal(c, sys ? SYSTEM_POOL : PUBLIC_POOL);
     }
 
     /**
@@ -781,7 +759,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException {
+    public IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture();
 
@@ -857,7 +835,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      */
     public IgniteInternalFuture<?> runLocalSafe(Runnable c, boolean sys) {
-        return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
+        return runLocalSafe(c, sys ? SYSTEM_POOL : PUBLIC_POOL);
     }
 
     /**
@@ -868,7 +846,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    public IgniteInternalFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) {
+    public IgniteInternalFuture<?> runLocalSafe(Runnable c, byte plc) {
         try {
             return runLocal(c, plc);
         }
@@ -912,7 +890,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException {
-        return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
+        return callLocal(c, sys ? SYSTEM_POOL : PUBLIC_POOL);
     }
 
     /**
@@ -922,7 +900,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException {
+    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc) throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture<>();
 
@@ -996,7 +974,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      */
     public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, boolean sys) {
-        return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL);
+        return callLocalSafe(c, sys ? SYSTEM_POOL : PUBLIC_POOL);
     }
 
     /**
@@ -1007,7 +985,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @param plc Policy to choose executor pool.
      * @return Future.
      */
-    public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) {
+    public <R> IgniteInternalFuture<R> callLocalSafe(Callable<R> c, byte plc) {
         try {
             return callLocal(c, plc);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 7697a12..643cb8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -58,17 +57,6 @@ public interface GridQueryIndexing {
     public void stop() throws IgniteCheckedException;
 
     /**
-     * Runs two step query.
-     *
-     * @param cctx Cache context.
-     * @param qry Query.
-     * @param keepCacheObjects If {@code true}, cache objects representation will be preserved.
-     * @return Cursor.
-     */
-    public Iterable<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry,
-        boolean keepCacheObjects);
-
-    /**
      * Parses SQL query into two step query and executes it.
      *
      * @param cctx Cache context.
@@ -92,12 +80,13 @@ public interface GridQueryIndexing {
      * @param spaceName Space name.
      * @param qry Query.
      * @param params Query parameters.
-     * @param filters Space name and key filters.
+     * @param filter Space name and key filter.
+     * @param enforceJoinOrder Enforce join order of tables in the query.
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
-    public GridQueryFieldsResult queryFields(@Nullable String spaceName, String qry,
-        Collection<Object> params, IndexingQueryFilter filters) throws IgniteCheckedException;
+    public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
+        Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder) throws IgniteCheckedException;
 
     /**
      * Executes regular query.
@@ -106,12 +95,12 @@ public interface GridQueryIndexing {
      * @param qry Query.
      * @param params Query parameters.
      * @param type Query return type.
-     * @param filters Space name and key filters.
+     * @param filter Space name and key filter.
      * @return Queried rows.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(@Nullable String spaceName, String qry,
-        Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filters) throws IgniteCheckedException;
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
+        Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter) throws IgniteCheckedException;
 
     /**
      * Executes text query.
@@ -119,35 +108,24 @@ public interface GridQueryIndexing {
      * @param spaceName Space name.
      * @param qry Text query.
      * @param type Query return type.
-     * @param filters Space name and key filter.
+     * @param filter Space name and key filter.
      * @return Queried rows.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(@Nullable String spaceName, String qry,
-        GridQueryTypeDescriptor type, IndexingQueryFilter filters) throws IgniteCheckedException;
-
-    /**
-     * Gets size of index for given type or -1 if it is a unknown type.
-     *
-     * @param spaceName Space name.
-     * @param desc Type descriptor.
-     * @param filters Filters.
-     * @return Objects number.
-     * @throws IgniteCheckedException If failed.
-     */
-    public long size(@Nullable String spaceName, GridQueryTypeDescriptor desc, IndexingQueryFilter filters)
-        throws IgniteCheckedException;
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(@Nullable String spaceName, String qry,
+        GridQueryTypeDescriptor type, IndexingQueryFilter filter) throws IgniteCheckedException;
 
     /**
      * Registers cache.
      *
+     * @param cctx Cache context.
      * @param ccfg Cache configuration.
      * @throws IgniteCheckedException If failed.
      */
-    public void registerCache(CacheConfiguration<?,?> ccfg) throws IgniteCheckedException;
+    public void registerCache(GridCacheContext<?,?> cctx, CacheConfiguration<?,?> ccfg) throws IgniteCheckedException;
 
     /**
-     * Deregisters cache.
+     * Unregisters cache.
      *
      * @param ccfg Cache configuration.
      * @throws IgniteCheckedException If failed to drop cache schema.
@@ -228,12 +206,11 @@ public interface GridQueryIndexing {
     /**
      * Returns backup filter.
      *
-     * @param caches List of caches.
      * @param topVer Topology version.
      * @param parts Partitions.
      * @return Backup filter.
      */
-    public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
+    public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer, int[] parts);
 
     /**
      * Client disconnected callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index a42eb98..04c6cb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -28,6 +28,7 @@ import org.apache.ignite.cache.CacheTypeMetadata;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.QueryIndexType;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
@@ -42,11 +43,11 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -191,11 +192,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param ccfg Cache configuration.
+     * @param cctx Cache context.
      * @throws IgniteCheckedException If failed.
      */
-    public void initializeCache(CacheConfiguration<?, ?> ccfg) throws IgniteCheckedException {
-        idx.registerCache(ccfg);
+    private void initializeCache(GridCacheContext<?, ?> cctx) throws IgniteCheckedException {
+        CacheConfiguration<?,?> ccfg = cctx.config();
+
+        idx.registerCache(cctx, cctx.config());
 
         try {
             List<Class<?>> mustDeserializeClss = null;
@@ -275,10 +278,28 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                         if (valCls != null)
                             altTypeId = new TypeId(ccfg.getName(), valCls);
+
+                        if (!cctx.customAffinityMapper() && qryEntity.getKeyType() != null) {
+                            // Need to setup affinity key for distributed joins.
+                            String affField = ctx.cacheObjects().affinityField(qryEntity.getKeyType());
+
+                            if (affField != null)
+                                desc.affinityKey(affField);
+                        }
                     }
                     else {
                         processClassMeta(qryEntity, desc, coCtx);
 
+                        AffinityKeyMapper keyMapper = cctx.config().getAffinityMapper();
+
+                        if (keyMapper instanceof GridCacheDefaultAffinityKeyMapper) {
+                            String affField =
+                                ((GridCacheDefaultAffinityKeyMapper)keyMapper).affinityKeyPropertyName(desc.keyCls);
+
+                            if (affField != null)
+                                desc.affinityKey(affField);
+                        }
+
                         typeId = new TypeId(ccfg.getName(), valCls);
                         altTypeId = new TypeId(ccfg.getName(), ctx.cacheObjects().typeId(qryEntity.getValueType()));
                     }
@@ -290,7 +311,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         types.put(altTypeId, desc);
 
                     desc.registered(idx.registerType(ccfg.getName(), desc));
-
                 }
             }
 
@@ -460,7 +480,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             return;
 
         try {
-            initializeCache(cctx.config());
+            initializeCache(cctx);
         }
         finally {
             busyLock.leaveBusy();
@@ -501,33 +521,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Returns number of objects of given type for given space of spi.
-     *
-     * @param space Space.
-     * @param valType Value type.
-     * @return Objects number or -1 if this type is unknown for given SPI and space.
-     * @throws IgniteCheckedException If failed.
-     */
-    public long size(@Nullable String space, Class<?> valType) throws IgniteCheckedException {
-        checkEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to get space size (grid is stopping).");
-
-        try {
-            TypeDescriptor desc = types.get(new TypeId(space, valType));
-
-            if (desc == null || !desc.registered())
-                return -1;
-
-            return idx.size(space, desc, null);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
      * Rebuilds all search indexes of given value type for given space of spi.
      *
      * @param space Space.
@@ -657,7 +650,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             return;
 
         if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to write to index (grid is stopping).");
+            return;
 
         try {
             if (coctx == null)
@@ -723,74 +716,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param space Space.
-     * @param clause Clause.
-     * @param params Parameters collection.
-     * @param resType Result type.
-     * @param filters Filters.
-     * @return Key/value rows.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
-        final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
-        throws IgniteCheckedException {
-        checkEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
-                @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
-                    TypeDescriptor type = typesByName.get(new TypeName(space, resType));
-
-                    if (type == null || !type.registered())
-                        throw new CacheException("Failed to find SQL table for type: " + resType);
-
-                    return idx.query(space, clause, params, type, filters);
-                }
-            }, false);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param space Space name.
-     * @param qry Query.
-     * @return Cursor.
-     */
-    public Iterable<List<?>> queryTwoStep(String space, final GridCacheTwoStepQuery qry) {
-        checkxEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<Object, Object> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(cctx, new IgniteOutClosureX<Iterable<List<?>>>() {
-                @Override public Iterable<List<?>> applyx() throws IgniteCheckedException {
-                    return idx.queryTwoStep(
-                        cctx,
-                        qry,
-                        cctx.keepBinary());
-                }
-            }, false);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -845,6 +770,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /**
      * @param cctx Cache context.
      * @param qry Query.
+     * @param keepBinary Keep binary flag.
      * @return Cursor.
      */
     public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(
@@ -873,12 +799,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                         if (typeDesc == null || !typeDesc.registered())
                             throw new CacheException("Failed to find SQL table for type: " + type);
 
-                        final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.query(
+                        final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
                             space,
                             sqlQry,
                             F.asList(params),
                             typeDesc,
-                            idx.backupFilter(null, null, null));
+                            idx.backupFilter(null, null));
 
                         sendQueryExecutedEvent(
                             sqlQry,
@@ -963,8 +889,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     String sql = qry.getSql();
                     Object[] args = qry.getArgs();
 
-                    final GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args),
-                        idx.backupFilter(null, null, null));
+                    final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
+                        idx.backupFilter(null, null), qry.isEnforceJoinOrder());
 
                     sendQueryExecutedEvent(sql, args);
 
@@ -1118,7 +1044,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     if (type == null || !type.registered())
                         throw new CacheException("Failed to find SQL table for type: " + resType);
 
-                    return idx.queryText(
+                    return idx.queryLocalText(
                         space,
                         clause,
                         type,
@@ -1132,35 +1058,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param space Space name.
-     * @param clause Clause.
-     * @param params Parameters collection.
-     * @param filters Key and value filters.
-     * @return Field rows.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridQueryFieldsResult queryFields(@Nullable final String space, final String clause,
-        final Collection<Object> params, final IndexingQueryFilter filters) throws IgniteCheckedException {
-        checkEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(cctx, new IgniteOutClosureX<GridQueryFieldsResult>() {
-                @Override public GridQueryFieldsResult applyx() throws IgniteCheckedException {
-                    return idx.queryFields(space, clause, params, filters);
-                }
-            }, false);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
      * Will be called when entry for key will be swapped.
      *
      * @param spaceName Space name.
@@ -1523,7 +1420,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 aliases,
                 coCtx);
 
-
             d.addProperty(prop, false);
         }
 
@@ -1770,7 +1666,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             res = clo.apply();
 
             if (res instanceof CacheQueryFuture) {
-                CacheQueryFuture fut = (CacheQueryFuture) res;
+                CacheQueryFuture fut = (CacheQueryFuture)res;
 
                 err = fut.error();
             }
@@ -1782,6 +1678,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             throw (IgniteCheckedException)err;
         }
+        catch (CacheException e) {
+            err = e;
+
+            throw e;
+        }
         catch (Exception e) {
             err = e;
 
@@ -2096,6 +1997,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         /** */
         private boolean valTextIdx;
 
+        /** */
+        private String affKey;
+
         /** SPI can decide not to register this type. */
         private boolean registered;
 
@@ -2265,6 +2169,18 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public String affinityKey() {
+            return affKey;
+        }
+
+        /**
+         * @param affKey Affinity key field.
+         */
+        void affinityKey(String affKey) {
+            this.affKey = affKey;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(TypeDescriptor.class, this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
index 45919ef..b636841 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryTypeDescriptor.java
@@ -82,4 +82,11 @@ public interface GridQueryTypeDescriptor {
      * @return If string representation of value should be full-text indexed.
      */
     public boolean valueTextIndex();
+
+    /**
+     * Returns affinity key field name or {@code null} for default.
+     *
+     * @return Affinity key.
+     */
+    public String affinityKey();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
index ecc0abd..6706ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
@@ -117,4 +117,4 @@ public class GridQueryCancelRequest implements Message {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index 499438d..bd9f7d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -143,4 +143,4 @@ public class GridQueryFailResponse implements Message {
     @Override public byte fieldsCount() {
         return 2;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 84cb57e..1feff5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -170,4 +170,4 @@ public class GridQueryNextPageRequest implements Message {
     @Override public byte fieldsCount() {
         return 3;
     }
-}
\ No newline at end of file
+}


Mime
View raw message