ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/4] ignite git commit: ignite-1232
Date Fri, 08 Jul 2016 13:59:01 GMT
ignite-1232


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1a612faa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1a612faa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1a612faa

Branch: refs/heads/ignite-1232
Commit: 1a612faad6dcd7b10cf146393530372b5e890842
Parents: a30946e
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Jul 8 10:01:32 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Jul 8 16:58:49 2016 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2TreeIndex.java           |  33 ++
 .../query/h2/sql/GridSqlQuerySplitter.java      |   2 -
 .../IgniteCacheCrossCacheJoinRandomTest.java    | 371 ++++++++++++
 ...iteCacheDistributedJoinCollocatedAndNot.java | 365 ++++++++++++
 ...ributedJoinPartitionedAndReplicatedTest.java | 171 +++---
 ...CacheDistributedJoinQueryConditionsTest.java | 588 +++++++++++++++++++
 .../IgniteCacheDistributedJoinQueryTest.java    | 572 ------------------
 ...teCacheJoinPartitionedAndReplicatedTest.java |  28 +-
 .../cache/IgniteCacheJoinQueryTest.java         | 563 ------------------
 ...IgniteCacheJoinQueryWithAffinityKeyTest.java | 563 ++++++++++++++++++
 .../cache/IgniteCrossCachesJoinsQueryTest.java  |   7 +-
 .../IgniteCacheQuerySelfTestSuite.java          |  16 +-
 12 files changed, 2056 insertions(+), 1223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 7b7e0fa..c0bc93c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -65,6 +65,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.h2.command.dml.Select;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexCondition;
@@ -75,6 +76,7 @@ import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
+import org.h2.table.Table;
 import org.h2.table.TableFilter;
 import org.h2.util.DoneFuture;
 import org.h2.value.Value;
@@ -757,6 +759,37 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
             return null;
 
+        Select select = filter.getSelect();
+
+        ArrayList<TableFilter> filters = select.getTopFilters();
+
+        if (filters.size() > 1) {
+            for (int i = 0; i < filters.size(); i++) {
+                TableFilter filter0 = filters.get(i);
+
+                if (filter0 == filter) {
+                    if (i == 0)
+                        break;
+
+                    TableFilter prevFilter = filters.get(i - 1);
+
+                    if (prevFilter.getJoin() == filter) {
+                        Table tbl = prevFilter.getTable();
+
+                        if (tbl instanceof GridH2Table && !((GridH2Table)tbl).isPartitioned())
+                            return null;
+
+                        break;
+                    }
+                }
+
+                Table tbl0 = filter0.getTable();
+
+                if (tbl0 instanceof GridH2Table && ((GridH2Table)tbl0).isPartitioned())
+                    break;
+            }
+        }
+
         IndexColumn affCol = getTable().getAffinityKeyColumn();
 
         int affColId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 25640a5..0c93838 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -27,10 +27,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.h2.command.Prepared;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.table.Column;
 import org.h2.table.IndexColumn;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
new file mode 100644
index 0000000..f2a9549
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Stack;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int OBJECTS = 200;
+
+    /** */
+    private static final int CACHES = 5;
+
+    /** */
+    private Random rnd;
+
+    /** {@inheritDoc} */
+    @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initCacheAndDbData() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkAllDataEquals() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        long seed = System.currentTimeMillis();
+
+        rnd = new Random(seed);
+
+        log.info("Random seed: " + seed);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid(SRVS);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Statement initializeH2Schema() throws SQLException {
+        Statement st = super.initializeH2Schema();
+
+        for (int i = 0; i < CACHES; i++) {
+            st.execute("CREATE SCHEMA \"cache" + i + "\"");
+
+            st.execute("create table \"cache" + i + "\".TESTOBJECT" +
+                "  (_key int not null," +
+                "  _val other not null," +
+                "  parentId int)");
+        }
+
+        return st;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name, CacheMode cacheMode, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(TestObject.class.getName());
+        entity.addQueryField("parentId", Integer.class.getName(), null);
+        entity.setIndexes(F.asList(new QueryIndex("parentId")));
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        List<Map<Integer, Integer>> cachesData = new ArrayList<>(CACHES);
+
+        for (int i = 0; i < CACHES; i++) {
+            Map<Integer, Integer> data = createData(OBJECTS / 2);
+
+            insertH2(data, i);
+
+            cachesData.add(data);
+        }
+
+        List<T2<CacheMode, Integer>> allModes = F.asList(
+            new T2<>(REPLICATED, 0),
+            new T2<>(PARTITIONED, 1),
+            new T2<>(PARTITIONED, 2),
+            new T2<>(PARTITIONED, 3));
+
+        checkJoin(cachesData, allModes, new Stack<T2<CacheMode, Integer>>(), CACHES);
+    }
+
+    /**
+     * @param cachesData Caches data.
+     * @param allModes Modes to test.
+     * @param modes Select modes.
+     * @param caches Caches number.
+     * @throws Exception If failed.
+     */
+    private void checkJoin(List<Map<Integer, Integer>> cachesData,
+        List<T2<CacheMode, Integer>> allModes,
+        Stack<T2<CacheMode, Integer>> modes,
+        int caches) throws Exception {
+        if (modes.size() == caches) {
+            List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+            for (int i = 0; i < modes.size(); i++) {
+                T2<CacheMode, Integer> mode = modes.get(i);
+
+                CacheConfiguration ccfg = configuration("cache" + i, mode.get1(), mode.get2());
+
+                ccfgs.add(ccfg);
+            }
+
+            log.info("Check configurations: " + modes);
+
+            checkJoin(ccfgs, cachesData);
+        }
+        else {
+            for (T2<CacheMode, Integer> mode : allModes) {
+                modes.push(mode);
+
+                checkJoin(cachesData, allModes, modes, caches);
+
+                modes.pop();
+            }
+        }
+    }
+
+    /**
+     * @param ccfgs Configurations.
+     * @param cachesData Caches data.
+     * @throws Exception If failed.
+     */
+    private void checkJoin(List<CacheConfiguration> ccfgs, List<Map<Integer, Integer>> cachesData) throws Exception {
+        Ignite client = ignite(SRVS);
+
+        try {
+            IgniteCache cache = null;
+
+            for (int i = 0; i < CACHES; i++) {
+                CacheConfiguration ccfg = ccfgs.get(i);
+
+                IgniteCache cache0 = client.createCache(ccfg);
+
+                if (cache == null && ccfg.getCacheMode() == PARTITIONED)
+                    cache = cache0;
+
+                insertCache(cachesData.get(i), cache0);
+            }
+
+            boolean distributedJoin = true;
+
+            if (cache == null) {
+                cache = client.cache(ccfgs.get(0).getName());
+
+                distributedJoin = false;
+            }
+
+            Object[] args = {};
+
+            compareQueryRes0(cache, createQuery(CACHES, null), distributedJoin, true, args, Ordering.RANDOM);
+
+            Map<Integer, Integer> data = cachesData.get(CACHES - 1);
+
+            for (Integer objId : data.keySet())
+                compareQueryRes0(cache, createQuery(CACHES, objId), distributedJoin, true, args, Ordering.RANDOM);
+        }
+        finally {
+            for (CacheConfiguration ccfg : ccfgs)
+                client.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param caches Number of caches to join.
+     * @param objId Object ID.
+     * @return SQL.
+     */
+    private String createQuery(int caches, @Nullable Integer objId) {
+        StringBuilder qry = new StringBuilder("select ");
+
+        for (int i = 0; i < caches; i++) {
+            if (i != 0)
+                qry.append(", ");
+
+            qry.append("o" + i + "._key");
+        }
+
+        qry.append(" from \"cache0\".TestObject o0 ");
+
+        for (int i = 1; i < caches; i++) {
+            String cacheName = "cache" + i;
+
+            String cur = "o" + i;
+            String prev = "o" + (i - 1);
+
+
+            qry.append("join \"" + cacheName + "\".TestObject " + cur);
+
+            if (i == caches - 1 && objId != null)
+                qry.append(" on (" + prev + ".parentId=" + cur + "._key and " + prev + "._key=" + objId + ") ");
+            else
+                qry.append(" on (" + prev + ".parentId=" + cur + "._key) ");
+        }
+
+        return qry.toString();
+    }
+
+    /**
+     * @param data Data.
+     * @param cache Cache.
+     */
+    private void insertCache(Map<Integer, Integer> data, IgniteCache<Object, Object> cache) {
+        for (Map.Entry<Integer, Integer> e : data.entrySet())
+            cache.put(e.getKey(), new TestObject(e.getValue()));
+    }
+
+    /**
+     * @param data Data.
+     * @param cache Cache index.
+     * @throws Exception If failed.
+     */
+    private void insertH2(Map<Integer, Integer> data, int cache) throws Exception {
+        for (Map.Entry<Integer, Integer> e : data.entrySet()) {
+            try(PreparedStatement st = conn.prepareStatement("insert into \"cache" + cache + "\".TESTOBJECT " +
+                "(_key, _val, parentId) values(?, ?, ?)")) {
+                st.setObject(1, e.getKey());
+                st.setObject(2, new TestObject(e.getValue()));
+                st.setObject(3, e.getValue());
+
+                st.executeUpdate();
+            }
+        }
+    }
+
+    /**
+     * @param cnt Objects count.
+     * @return Generated data.
+     */
+    private Map<Integer, Integer> createData(int cnt) {
+        assert cnt <= OBJECTS : cnt;
+
+        Map<Integer, Integer> res = new HashMap<>();
+
+        while (res.size() < cnt)
+            res.put(rnd.nextInt(OBJECTS), rnd.nextInt(OBJECTS));
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    static class TestObject implements Serializable {
+        /** */
+        int parentId;
+
+        /**
+         * @param parentId Parent object ID.
+         */
+        public TestObject(int parentId) {
+            this.parentId = parentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestObject.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNot.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNot.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNot.java
new file mode 100644
index 0000000..ba41ddd
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNot.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinCollocatedAndNot extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private static final String ACCOUNT_CACHE = "acc";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(PersonKey.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("id", Integer.class.getName(), null);
+            entity.addQueryField("affKey", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ACCOUNT_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Account.class.getName());
+            entity.addQueryField("personId", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(2);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+        IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        AtomicInteger orgKey = new AtomicInteger();
+        AtomicInteger accKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        /**
+         * One organization, one person, two accounts.
+         */
+
+        int orgId1 = keyForNode(aff, orgKey, node0);
+
+        orgCache.put(orgId1, new Organization("obj-" + orgId1));
+
+        personCache.put(new PersonKey(1, orgId1), new Person(1, "o1-p1"));
+        personCache.put(new PersonKey(2, orgId1), new Person(2, "o1-p2"));
+
+        accCache.put(keyForNode(aff, accKey, node0), new Account(1, "a0"));
+        accCache.put(keyForNode(aff, accKey, node1), new Account(1, "a1"));
+
+        // Join on affinity keys equals condition should not be distributed.
+        String qry = "select o.name, p._key, p.name " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p.affKey = o._key";
+
+        assertFalse(plan(qry, orgCache, false).contains("batched"));
+
+        checkQuery(qry, orgCache, false, 2);
+
+        qry = "select o.name, p._key, p.name " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p.affKey != o._key";
+
+        assertTrue(plan(qry, orgCache, false).contains("batched"));
+
+        checkQuery(qry, orgCache, false, 0);
+
+        checkQuery("select o.name, p._key, p.name, a.name " +
+            "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " +
+            "where p.affKey = o._key and p.id = a.personId", orgCache, true, 2);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @return Query plan.
+     */
+    private String plan(String sql,
+        IgniteCache<?, ?> cache,
+        boolean enforceJoinOrder) {
+        return (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize) {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+    }
+    /**
+     *
+     */
+    public static class PersonKey {
+        /** */
+        private int id;
+
+        /** */
+        @AffinityKeyMapped
+        private int affKey;
+
+        /**
+         * @param id Key.
+         * @param affKey Affinity key.
+         */
+        public PersonKey(int id, int affKey) {
+            this.id = id;
+            this.affKey = affKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PersonKey other = (PersonKey)o;
+
+            return id == other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        int personId;
+
+        /** */
+        String name;
+
+        /**
+         * @param personId Person ID.
+         * @param name Name.
+         */
+        public Account(int personId, String name) {
+            this.personId = personId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int id;
+
+        /** */
+        String name;
+
+        /**
+         * @param id Person ID.
+         * @param name Name.
+         */
+        public Person(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
index 4c71a2e..88f0d21 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -69,6 +70,28 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
 
         spi.setIpFinder(IP_FINDER);
 
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    private List<CacheConfiguration> caches(boolean idx) {
         List<CacheConfiguration> ccfgs = new ArrayList<>();
 
         {
@@ -83,6 +106,9 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
             entity.addQueryField("orgId", Integer.class.getName(), null);
             entity.addQueryField("name", String.class.getName(), null);
 
+            if (idx)
+                entity.setIndexes(F.asList(new QueryIndex("orgId"), new QueryIndex("name")));
+
             ccfg.setQueryEntities(F.asList(entity));
 
             ccfgs.add(ccfg);
@@ -96,6 +122,9 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
             entity.setValueType(Organization.class.getName());
             entity.addQueryField("name", String.class.getName(), null);
 
+            if (idx)
+                entity.setIndexes(F.asList(new QueryIndex("name")));
+
             ccfg.setQueryEntities(F.asList(entity));
 
             ccfgs.add(ccfg);
@@ -111,32 +140,18 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
             entity.addQueryField("personId", Integer.class.getName(), null);
             entity.addQueryField("name", String.class.getName(), null);
 
+            if (idx) {
+                entity.setIndexes(F.asList(new QueryIndex("orgId"),
+                    new QueryIndex("personId"),
+                    new QueryIndex("name")));
+            }
+
             ccfg.setQueryEntities(F.asList(entity));
 
             ccfgs.add(ccfg);
         }
 
-        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /**
-     * @param name Cache name.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration configuration(String name) {
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(name);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicWriteOrderMode(PRIMARY);
-        ccfg.setAtomicityMode(ATOMIC);
-        ccfg.setBackups(1);
-
-        return ccfg;
+        return ccfgs;
     }
 
     /** {@inheritDoc} */
@@ -161,78 +176,98 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid
      * @throws Exception If failed.
      */
     public void testJoin() throws Exception {
+        join(true);
+
+        join(false);
+    }
+
+    /**
+     * @param idx Use index flag.
+     * @throws Exception If failed.
+     */
+    private void join(boolean idx) throws Exception {
         Ignite client = grid(2);
 
-        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
-        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
-        IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
+        for (CacheConfiguration ccfg : caches(idx))
+            client.createCache(ccfg);
 
-        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+        try {
+            IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+            IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+            IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
 
-        AtomicInteger pKey = new AtomicInteger(100_000);
-        AtomicInteger orgKey = new AtomicInteger();
-        AtomicInteger accKey = new AtomicInteger();
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
 
-        ClusterNode node0 = ignite(0).cluster().localNode();
-        ClusterNode node1 = ignite(1).cluster().localNode();
+            AtomicInteger pKey = new AtomicInteger(100_000);
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger accKey = new AtomicInteger();
 
-        /**
-         * One organization, one person, two accounts.
-         */
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
 
-        int orgId1 = keyForNode(aff, orgKey, node0);
+            /**
+             * One organization, one person, two accounts.
+             */
 
-        orgCache.put(orgId1, new Organization("obj-" + orgId1));
+            int orgId1 = keyForNode(aff, orgKey, node0);
 
-        int pid = keyForNode(aff, pKey, node0);
-        personCache.put(pid, new Person(orgId1, "o1-p1"));
+            orgCache.put(orgId1, new Organization("obj-" + orgId1));
 
-        accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1, "a0"));
-        accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1, "a1"));
+            int pid = keyForNode(aff, pKey, node0);
+            personCache.put(pid, new Person(orgId1, "o1-p1"));
 
-//        checkQuery("select p._key, p.name, a.name " +
-//            "from \"person\".Person p, \"acc\".Account a " +
-//            "where p._key = a.personId", orgCache, true, 2);
+            accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1, "a0"));
+            accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1, "a1"));
 
-        checkQuery("select o.name, p._key, p.name, a.name " +
-            "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " +
-            "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+            checkQuery("select p._key, p.name, a.name " +
+                "from \"person\".Person p, \"acc\".Account a " +
+                "where p._key = a.personId", orgCache, true, 2);
 
-        checkQuery("select o.name, p._key, p.name, a.name " +
-            "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " +
-            "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
 
-        checkQuery("select o.name, p._key, p.name, a.name " +
-            "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " +
-            "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " +
+                "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
 
-        String[] cacheNames = {"\"org\".Organization o", "\"person\".Person p", "\"acc\".Account a"};
+            checkQuery("select o.name, p._key, p.name, a.name " +
+                "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " +
+                "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
 
-        for (int c1 = 0; c1 < cacheNames.length; c1++) {
-            for (int c2 = 0; c2 < cacheNames.length; c2++) {
-                if (c2 == c1)
-                    continue;
+            String[] cacheNames = {"\"org\".Organization o", "\"person\".Person p", "\"acc\".Account a"};
 
-                for (int c3 = 0; c3 < cacheNames.length; c3++) {
-                    if (c3 == c1 || c3 == c2)
+            for (int c1 = 0; c1 < cacheNames.length; c1++) {
+                for (int c2 = 0; c2 < cacheNames.length; c2++) {
+                    if (c2 == c1)
                         continue;
 
-                    String cache1 = cacheNames[c1];
-                    String cache2 = cacheNames[c2];
-                    String cache3 = cacheNames[c3];
+                    for (int c3 = 0; c3 < cacheNames.length; c3++) {
+                        if (c3 == c1 || c3 == c2)
+                            continue;
+
+                        String cache1 = cacheNames[c1];
+                        String cache2 = cacheNames[c2];
+                        String cache3 = cacheNames[c3];
 
-                    StringBuilder qry = new StringBuilder("select o.name, p._key, p.name, a.name from ").
-                        append(cache1).append(", ").
-                        append(cache2).append(", ").
-                        append(cache3).append(" ").
-                        append("where p.orgId = o._key and p._key = a.personId");
+                        StringBuilder qry = new StringBuilder("select o.name, p._key, p.name, a.name from ").
+                            append(cache1).append(", ").
+                            append(cache2).append(", ").
+                            append(cache3).append(" ").
+                            append("where p.orgId = o._key and p._key = a.personId");
 
-                    checkQuery(qry.toString(), orgCache, true, 2);
+                        checkQuery(qry.toString(), orgCache, true, 2);
 
-                    checkQuery(qry.toString(), orgCache, false, 2);
+                        checkQuery(qry.toString(), orgCache, false, 2);
+                    }
                 }
             }
         }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+            client.destroyCache(ACCOUNT_CACHE);
+        }
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
new file mode 100644
index 0000000..98630e9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryConditionsTest.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinQueryConditionsTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private boolean client;
+
+    /** */
+    private int total;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery1() throws Exception {
+        joinQuery1(true);
+
+        joinQuery1(false);
+    }
+
+    /**
+     * @param idx Use index flag.
+     * @throws Exception If failed.
+     */
+    private void joinQuery1(boolean idx) throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 =
+                cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(idx, idx)));
+            CacheConfiguration ccfg2 =
+                cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(idx)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            List<Integer> orgIds = putData1();
+
+//            checkQuery("select * from " +
+//                "(select _key, name from \"org\".Organization) o " +
+//                "inner join " +
+//                "(select orgId from Person) p " +
+//                "on p.orgId = o._key", pCache, total);
+//
+//            checkQuery("select _key, name from \"org\".Organization o " +
+//                "inner join (select orgId from Person) p on p.orgId = o._key", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key", pCache, total);
+
+            checkQuery("select * from (select o._key, o.name, p._key pKey, p.name pName " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key)", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o inner join Person p " +
+                "on p.orgId = o._key", pCache, total);
+
+            checkQuery("select * from (select o._key o_key, o.name o_name, p._key p_key, p.name p_name " +
+                "from \"org\".Organization o inner join Person p " +
+                "on p.orgId = o._key)", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key > " + orgIds.get(2), pCache, total - 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and o._key < " + orgIds.get(4), pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name", pCache, total);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 0);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.name = o.name and o.name='obj-" + orgIds.get(3) + "'", pCache, 3);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery2() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true)));
+            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            List<Integer> pIds = new ArrayList<>();
+
+            for (int i = 0; i < 3; i++) {
+                Integer orgId = keyForNode(aff, orgKey, node0);
+
+                orgCache.put(orgId, new Organization("org-" + orgId));
+
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+                pIds.add(pId);
+            }
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key >= 0", pCache, 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 2);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery3() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            List<Integer> pIds = new ArrayList<>();
+
+            for (int i = 0; i < 3; i++) {
+                Integer orgId = keyForNode(aff, orgKey, node0);
+
+                orgCache.put(orgId, new Organization("org-" + orgId));
+
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId));
+
+                pIds.add(pId);
+            }
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key", pCache, 9);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 3);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 6);
+
+            checkQuery("select o._key, o.name, p._key, p.name " +
+                "from \"org\".Organization o, Person p " +
+                "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and p._key <= " + pIds.get(2), pCache, 9);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery4() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 =
+                cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger pKey = new AtomicInteger();
+
+            Integer pId0 = keyForNode(aff, pKey, node0);
+
+            pCache.put(pId0, new Person(0, "p0"));
+
+            for (int i = 0; i < 3; i++) {
+                Integer pId = keyForNode(aff, pKey, node1);
+
+                pCache.put(pId, new Person(0, "p"));
+            }
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key", pCache, 6);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key and p1._key=" + pId0, pCache, 3);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p2._key > p1._key and p1.name='p0'", pCache, 3);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p1.name != p2.name", pCache, 6);
+
+            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+                "from Person p1, Person p2 " +
+                "where p1.name > p2.name", pCache, 3);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinQuery5() throws Exception {
+        Ignite client = grid(2);
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+            ClusterNode node0 = ignite(0).cluster().localNode();
+            ClusterNode node1 = ignite(1).cluster().localNode();
+
+            Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+            AtomicInteger orgKey = new AtomicInteger();
+            AtomicInteger pKey = new AtomicInteger();
+
+            Integer orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("org-" + orgId));
+
+            Integer pId = keyForNode(aff, pKey, node1);
+
+            pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+            checkQuery("select o._key from \"org\".Organization o, Person p where p.orgId = o._key", pCache, 1);
+
+//            checkQuery("select o.name from \"org\".Organization o where o._key in " +
+//                "(select o._key from \"org\".Organization o, Person p where p.orgId = o._key)", pCache, 1);
+        }
+        finally {
+            client.destroyCache(PERSON_CACHE);
+            client.destroyCache(ORG_CACHE);
+        }
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     */
+    private void checkQuery(String sql, IgniteCache<Object, Object> cache, int expSize, Object... args) {
+        log.info("Execute query: " + sql);
+
+        checkQuery(sql, cache, false, expSize, args);
+
+        checkQuery(sql, cache, true, expSize, args);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize,
+        Object... args) {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+        qry.setArgs(args);
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+    }
+
+    /**
+     * @param idxName Name index flag.
+     * @param idxOrgId Org ID index flag.
+     * @return Entity.
+     */
+    private QueryEntity personEntity(boolean idxName, boolean idxOrgId) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Person.class.getName());
+
+        entity.addQueryField("orgId", Integer.class.getName(), null);
+        entity.addQueryField("name", String.class.getName(), null);
+
+        List<QueryIndex> idxs = new ArrayList<>();
+
+        if (idxName) {
+            QueryIndex idx = new QueryIndex("name");
+
+            idxs.add(idx);
+        }
+
+        if (idxOrgId) {
+            QueryIndex idx = new QueryIndex("orgId");
+
+            idxs.add(idx);
+        }
+
+        entity.setIndexes(idxs);
+
+        return entity;
+    }
+
+    /**
+     * @param idxName Name index flag.
+     * @return Entity.
+     */
+    private QueryEntity organizationEntity(boolean idxName) {
+        QueryEntity entity = new QueryEntity();
+
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(Organization.class.getName());
+
+        entity.addQueryField("name", String.class.getName(), null);
+
+        if (idxName) {
+            QueryIndex idx = new QueryIndex("name");
+
+            entity.setIndexes(F.asList(idx));
+        }
+
+        return entity;
+    }
+
+    /**
+     * @return Organization ids.
+     */
+    private List<Integer> putData1() {
+        total = 0;
+
+        Ignite client = grid(2);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger();
+        AtomicInteger orgKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        List<Integer> data = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++) {
+            int orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("obj-" + orgId));
+
+            for (int j = 0; j < i; j++) {
+                personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "obj-" + orgId));
+
+                total++;
+            }
+
+            data.add(orgId);
+        }
+
+        return data;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(0);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /** */
+        String name;
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
deleted file mode 100644
index e4ee2fd..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cache.QueryIndex;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
-import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- *
- */
-public class IgniteCacheDistributedJoinQueryTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final String PERSON_CACHE = "person";
-
-    /** */
-    private static final String ORG_CACHE = "org";
-
-    /** */
-    private boolean client;
-
-    /** */
-    private int total;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi());
-
-        spi.setIpFinder(IP_FINDER);
-
-        cfg.setClientMode(client);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        startGridsMultiThreaded(2);
-
-        client = true;
-
-        startGrid(2);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        super.afterTestsStopped();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoinQuery1() throws Exception {
-        Ignite client = grid(2);
-
-        try {
-            CacheConfiguration ccfg1 =
-                cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
-            CacheConfiguration ccfg2 =
-                cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
-
-            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
-            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
-
-            List<Integer> orgIds = putData1();
-
-            checkQuery("select _key, name from \"org\".Organization o " +
-                "inner join (select orgId from Person) p on p.orgId = o._key", pCache, total);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key", pCache, total);
-
-            checkQuery("select * from (select o._key, o.name, p._key pKey, p.name pName " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key)", pCache, total);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o inner join Person p " +
-                "on p.orgId = o._key", pCache, total);
-
-            checkQuery("select * from (select o._key o_key, o.name o_name, p._key p_key, p.name p_name " +
-                "from \"org\".Organization o inner join Person p " +
-                "on p.orgId = o._key)", pCache, total);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 3);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and o._key > " + orgIds.get(2), pCache, total - 3);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and o._key < " + orgIds.get(4), pCache, 5);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.name = o.name", pCache, total);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 0);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 3);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.name = o.name and o.name='obj-" + orgIds.get(3) + "'", pCache, 3);
-        }
-        finally {
-            client.destroyCache(PERSON_CACHE);
-            client.destroyCache(ORG_CACHE);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoinQuery2() throws Exception {
-        Ignite client = grid(2);
-
-        try {
-            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true)));
-            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
-
-            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
-            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
-
-            ClusterNode node0 = ignite(0).cluster().localNode();
-            ClusterNode node1 = ignite(1).cluster().localNode();
-
-            Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-            AtomicInteger orgKey = new AtomicInteger();
-            AtomicInteger pKey = new AtomicInteger();
-
-            List<Integer> pIds = new ArrayList<>();
-
-            for (int i = 0; i < 3; i++) {
-                Integer orgId = keyForNode(aff, orgKey, node0);
-
-                orgCache.put(orgId, new Organization("org-" + orgId));
-
-                Integer pId = keyForNode(aff, pKey, node1);
-
-                pCache.put(pId, new Person(orgId, "p-" + orgId));
-
-                pIds.add(pId);
-            }
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and p._key >= 0", pCache, 3);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 2);
-        }
-        finally {
-            client.destroyCache(PERSON_CACHE);
-            client.destroyCache(ORG_CACHE);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoinQuery3() throws Exception {
-        Ignite client = grid(2);
-
-        try {
-            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
-            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
-
-            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
-            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
-
-            ClusterNode node0 = ignite(0).cluster().localNode();
-            ClusterNode node1 = ignite(1).cluster().localNode();
-
-            Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-            AtomicInteger orgKey = new AtomicInteger();
-            AtomicInteger pKey = new AtomicInteger();
-
-            List<Integer> pIds = new ArrayList<>();
-
-            for (int i = 0; i < 3; i++) {
-                Integer orgId = keyForNode(aff, orgKey, node0);
-
-                orgCache.put(orgId, new Organization("org-" + orgId));
-
-                Integer pId = keyForNode(aff, pKey, node1);
-
-                pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId));
-
-                pIds.add(pId);
-            }
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId != o._key", pCache, 9);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 3);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 6);
-
-            checkQuery("select o._key, o.name, p._key, p.name " +
-                "from \"org\".Organization o, Person p " +
-                "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and p._key <= " + pIds.get(2), pCache, 9);
-        }
-        finally {
-            client.destroyCache(PERSON_CACHE);
-            client.destroyCache(ORG_CACHE);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoinQuery4() throws Exception {
-        Ignite client = grid(2);
-
-        try {
-            CacheConfiguration ccfg1 =
-                cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
-
-            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
-
-            ClusterNode node0 = ignite(0).cluster().localNode();
-            ClusterNode node1 = ignite(1).cluster().localNode();
-
-            Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-            AtomicInteger pKey = new AtomicInteger();
-
-            Integer pId0 = keyForNode(aff, pKey, node0);
-
-            pCache.put(pId0, new Person(0, "p0"));
-
-            for (int i = 0; i < 3; i++) {
-                Integer pId = keyForNode(aff, pKey, node1);
-
-                pCache.put(pId, new Person(0, "p"));
-            }
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
-                "where p2._key > p1._key", pCache, 6);
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
-                "where p2._key > p1._key and p1._key=" + pId0, pCache, 3);
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
-                "where p2._key > p1._key and p1.name='p0'", pCache, 3);
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
-                "where p1.name != p2.name", pCache, 6);
-
-            checkQuery("select p1._key, p1.name, p2._key, p2.name " +
-                "from Person p1, Person p2 " +
-                "where p1.name > p2.name", pCache, 3);
-        }
-        finally {
-            client.destroyCache(PERSON_CACHE);
-            client.destroyCache(ORG_CACHE);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testJoinQuery5() throws Exception {
-        Ignite client = grid(2);
-
-        try {
-            CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
-            CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
-
-            IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
-            IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
-
-            ClusterNode node0 = ignite(0).cluster().localNode();
-            ClusterNode node1 = ignite(1).cluster().localNode();
-
-            Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-            AtomicInteger orgKey = new AtomicInteger();
-            AtomicInteger pKey = new AtomicInteger();
-
-            Integer orgId = keyForNode(aff, orgKey, node0);
-
-            orgCache.put(orgId, new Organization("org-" + orgId));
-
-            Integer pId = keyForNode(aff, pKey, node1);
-
-            pCache.put(pId, new Person(orgId, "p-" + orgId));
-
-            checkQuery("select o._key from \"org\".Organization o, Person p where p.orgId = o._key", pCache, 1);
-
-            checkQuery("select o.name from \"org\".Organization o where o._key in " +
-                "(select o._key from \"org\".Organization o, Person p where p.orgId = o._key)", pCache, 1);
-        }
-        finally {
-            client.destroyCache(PERSON_CACHE);
-            client.destroyCache(ORG_CACHE);
-        }
-    }
-
-    /**
-     * @param sql SQL.
-     * @param cache Cache.
-     * @param expSize Expected results size.
-     * @param args Arguments.
-     */
-    private void checkQuery(String sql, IgniteCache<Object, Object> cache, int expSize, Object... args) {
-        log.info("Execute query: " + sql);
-
-        checkQuery(sql, cache, false, expSize, args);
-
-        checkQuery(sql, cache, true, expSize, args);
-    }
-
-    /**
-     * @param sql SQL.
-     * @param cache Cache.
-     * @param enforceJoinOrder Enforce join order flag.
-     * @param expSize Expected results size.
-     * @param args Arguments.
-     */
-    private void checkQuery(String sql,
-        IgniteCache<Object, Object> cache,
-        boolean enforceJoinOrder,
-        int expSize,
-        Object... args) {
-        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
-            .setDistributedJoins(true)
-            .setEnforceJoinOrder(enforceJoinOrder))
-            .getAll().get(0).get(0);
-
-        log.info("Plan: " + plan);
-
-        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
-
-        qry.setDistributedJoins(true);
-        qry.setEnforceJoinOrder(enforceJoinOrder);
-        qry.setArgs(args);
-
-        QueryCursor<List<?>> cur = cache.query(qry);
-
-        List<List<?>> res = cur.getAll();
-
-        if (expSize != res.size())
-            log.info("Results: " + res);
-
-        assertEquals(expSize, res.size());
-    }
-
-    /**
-     * @param idxName Name index flag.
-     * @param idxOrgId Org ID index flag.
-     * @return Entity.
-     */
-    private QueryEntity personEntity(boolean idxName, boolean idxOrgId) {
-        QueryEntity entity = new QueryEntity();
-
-        entity.setKeyType(Integer.class.getName());
-        entity.setValueType(Person.class.getName());
-
-        entity.addQueryField("orgId", Integer.class.getName(), null);
-        entity.addQueryField("name", String.class.getName(), null);
-
-        List<QueryIndex> idxs = new ArrayList<>();
-
-        if (idxName) {
-            QueryIndex idx = new QueryIndex("name");
-
-            idxs.add(idx);
-        }
-
-        if (idxOrgId) {
-            QueryIndex idx = new QueryIndex("orgId");
-
-            idxs.add(idx);
-        }
-
-        entity.setIndexes(idxs);
-
-        return entity;
-    }
-
-    /**
-     * @param idxName Name index flag.
-     * @return Entity.
-     */
-    private QueryEntity organizationEntity(boolean idxName) {
-        QueryEntity entity = new QueryEntity();
-
-        entity.setKeyType(Integer.class.getName());
-        entity.setValueType(Organization.class.getName());
-
-        entity.addQueryField("name", String.class.getName(), null);
-
-        if (idxName) {
-            QueryIndex idx = new QueryIndex("name");
-
-            entity.setIndexes(F.asList(idx));
-        }
-
-        return entity;
-    }
-
-    /**
-     * @return Organization ids.
-     */
-    private List<Integer> putData1() {
-        total = 0;
-
-        Ignite client = grid(2);
-
-        Affinity<Object> aff = client.affinity(PERSON_CACHE);
-
-        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
-        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
-
-        AtomicInteger pKey = new AtomicInteger();
-        AtomicInteger orgKey = new AtomicInteger();
-
-        ClusterNode node0 = ignite(0).cluster().localNode();
-        ClusterNode node1 = ignite(1).cluster().localNode();
-
-        List<Integer> data = new ArrayList<>();
-
-        for (int i = 0; i < 5; i++) {
-            int orgId = keyForNode(aff, orgKey, node0);
-
-            orgCache.put(orgId, new Organization("obj-" + orgId));
-
-            for (int j = 0; j < i; j++) {
-                personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "obj-" + orgId));
-
-                total++;
-            }
-
-            data.add(orgId);
-        }
-
-        return data;
-    }
-
-    /**
-     * @param name Cache name.
-     * @return Configuration.
-     */
-    private CacheConfiguration cacheConfiguration(String name) {
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setName(name);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setAtomicWriteOrderMode(PRIMARY);
-        ccfg.setAtomicityMode(ATOMIC);
-        ccfg.setBackups(0);
-
-        return ccfg;
-    }
-
-    /**
-     *
-     */
-    private static class Person implements Serializable {
-        /** */
-        int orgId;
-
-        /** */
-        String name;
-
-        /**
-         * @param orgId Organization ID.
-         * @param name Name.
-         */
-        public Person(int orgId, String name) {
-            this.orgId = orgId;
-            this.name = name;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class Organization implements Serializable {
-        /** */
-        String name;
-
-        /**
-         * @param name Name.
-         */
-        public Organization(String name) {
-            this.name = name;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a612faa/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index bd773ce..d72e626 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -179,14 +179,6 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
             "on (p.orgId = o.id)", orgCache, 2);
 
         checkQuery("select o.name, p._key, p.name " +
-            "from \"person\".Person p left join \"org\".Organization o " +
-            "on (p.orgId = o.id)", orgCache, 2);
-
-        checkQuery("select o.name, p._key, p.name " +
-            "from \"org\".Organization o left join \"person\".Person p " +
-            "on (p.orgId = o.id)", orgCache, 2);
-
-        checkQuery("select o.name, p._key, p.name " +
             "from \"person\".Person p join \"orgRepl\".Organization o " +
             "on (p.orgId = o.id)", orgCacheRepl, 2);
 
@@ -194,6 +186,20 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
             "from \"orgRepl\".Organization o join \"person\".Person p " +
             "on (p.orgId = o.id)", orgCacheRepl, 2);
 
+        checkQuery("select p.name from \"person\".Person p ", ignite(0).cache(PERSON_CACHE), 2);
+        checkQuery("select p.name from \"person\".Person p ", ignite(1).cache(PERSON_CACHE), 2);
+
+        for (int i = 0; i < 10; i++)
+            checkQuery("select p.name from \"person\".Person p ", personCache, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"person\".Person p left join \"org\".Organization o " +
+            "on (p.orgId = o.id)", orgCache, 2);
+
+        checkQuery("select o.name, p._key, p.name " +
+            "from \"org\".Organization o left join \"person\".Person p " +
+            "on (p.orgId = o.id)", orgCache, 2);
+
         checkQuery("select o.name, p._key, p.name " +
             "from \"person\".Person p left join \"orgRepl\".Organization o " +
             "on (p.orgId = o.id)", orgCacheRepl, 2);
@@ -201,12 +207,6 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
         checkQuery("select o.name, p._key, p.name " +
             "from \"orgRepl\".Organization o left join \"person\".Person p " +
             "on (p.orgId = o.id)", orgCacheRepl, 2);
-
-        checkQuery("select p.name from \"person\".Person p ", ignite(0).cache(PERSON_CACHE), 2);
-        checkQuery("select p.name from \"person\".Person p ", ignite(1).cache(PERSON_CACHE), 2);
-
-        for (int i = 0; i < 10; i++)
-            checkQuery("select p.name from \"person\".Person p ", personCache, 2);
     }
 
     /**


Mime
View raw message