ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/11] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Fri, 22 Jul 2016 14:08:44 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
new file mode 100644
index 0000000..bc8aa75
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Range of rows.
+ */
+public class GridH2RowRange implements Message {
+    /** */
+    private static int FLAG_PARTIAL = 1;
+
+    /** */
+    private int rangeId;
+
+    /** */
+    @GridDirectCollection(Message.class)
+    @GridToStringInclude
+    private List<GridH2RowMessage> rows;
+
+    /** */
+    private byte flags;
+
+    /**
+     * @param rangeId Range ID.
+     */
+    public void rangeId(int rangeId) {
+        this.rangeId = rangeId;
+    }
+
+    /**
+     * @return Range ID.
+     */
+    public int rangeId() {
+        return rangeId;
+    }
+
+    /**
+     * @param rows Rows.
+     */
+    public void rows(List<GridH2RowMessage> rows) {
+        this.rows = rows;
+    }
+
+    /**
+     * @return Rows.
+     */
+    public List<GridH2RowMessage> rows() {
+        return rows;
+    }
+
+    /**
+     * Sets that this is a partial range.
+     */
+    public void setPartial() {
+        flags |= FLAG_PARTIAL;
+    }
+
+    /**
+     * @return {@code true} If this is a partial range.
+     */
+    public boolean isPartial() {
+        return (flags & FLAG_PARTIAL) == FLAG_PARTIAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("rangeId", rangeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeCollection("rows", rows, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                rangeId = reader.readInt("rangeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                rows = reader.readCollection("rows", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2RowRange.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -34;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2RowRange.class, this, "rowsSize", rows != null ? rows.size() : 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java
new file mode 100644
index 0000000..e32e449
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Bounds of row range.
+ */
+public class GridH2RowRangeBounds implements Message {
+    /** */
+    private int rangeId;
+
+    /** */
+    private GridH2RowMessage first;
+
+    /** */
+    private GridH2RowMessage last;
+
+    /**
+     * @param rangeId Range ID.
+     * @param first First.
+     * @param last Last.
+     * @return Range bounds.
+     */
+    public static GridH2RowRangeBounds rangeBounds(int rangeId, GridH2RowMessage first, GridH2RowMessage last) {
+        GridH2RowRangeBounds res = new GridH2RowRangeBounds();
+
+        res.rangeId(rangeId);
+        res.first(first);
+        res.last(last);
+
+        return res;
+    }
+
+    /**
+     * @param rangeId Range ID.
+     */
+    public void rangeId(int rangeId) {
+        this.rangeId = rangeId;
+    }
+
+    /**
+     * @return Range ID.
+     */
+    public int rangeId() {
+        return rangeId;
+    }
+
+    /**
+     * @param first First.
+     */
+    public void first(GridH2RowMessage first) {
+        this.first = first;
+    }
+
+    /**
+     * @return First.
+     */
+    public GridH2RowMessage first() {
+        return first;
+    }
+
+    /**
+     * @param last Last.
+     */
+    public void last(GridH2RowMessage last) {
+        this.last = last;
+    }
+
+    /**
+     * @return Last.
+     */
+    public GridH2RowMessage last() {
+        return last;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("first", first))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("last", last))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("rangeId", rangeId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                first = reader.readMessage("first");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                last = reader.readMessage("last");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                rangeId = reader.readInt("rangeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridH2RowRangeBounds.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -35;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2RowRangeBounds.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java
index 841e01e..ebeca9d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java
@@ -99,7 +99,7 @@ public class GridH2Short extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Short.class);
     }
 
     /** {@inheritDoc} */
@@ -111,4 +111,9 @@ public class GridH2Short extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(x);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java
index 50b4d58..f2f9fdc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java
@@ -101,7 +101,7 @@ public class GridH2String extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2String.class);
     }
 
     /** {@inheritDoc} */
@@ -113,4 +113,9 @@ public class GridH2String extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return x;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java
index 1c6c7ae..172d695 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java
@@ -102,7 +102,7 @@ public class GridH2Time extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Time.class);
     }
 
     /** {@inheritDoc} */
@@ -114,4 +114,9 @@ public class GridH2Time extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 1;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return String.valueOf(nanos);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java
index ccdba92..b020799 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java
@@ -50,7 +50,7 @@ public class GridH2Timestamp extends GridH2ValueMessage {
         ValueTimestamp t = (ValueTimestamp)val;
 
         date = t.getDateValue();
-        nanos = t.getNanos();
+        nanos = t.getTimeNanos();
     }
 
     /** {@inheritDoc} */
@@ -119,7 +119,7 @@ public class GridH2Timestamp extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Timestamp.class);
     }
 
     /** {@inheritDoc} */
@@ -131,4 +131,9 @@ public class GridH2Timestamp extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 2;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return date + "_" + nanos;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java
index fd14d90..fa9360b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java
@@ -119,7 +119,7 @@ public class GridH2Uuid extends GridH2ValueMessage {
 
         }
 
-        return true;
+        return reader.afterMessageRead(GridH2Uuid.class);
     }
 
     /** {@inheritDoc} */
@@ -131,4 +131,9 @@ public class GridH2Uuid extends GridH2ValueMessage {
     @Override public byte fieldsCount() {
         return 2;
     }
-}
\ No newline at end of file
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return ValueUuid.get(high, low).getString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java
index d528c47..18f8880 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java
@@ -52,4 +52,4 @@ public abstract class GridH2ValueMessage implements Message {
     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
         return true;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index d414e5a..aa84e4b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -90,6 +90,24 @@ public class GridH2ValueMessageFactory implements MessageFactory {
 
             case -22:
                 return new GridH2CacheObject();
+
+            case -30:
+                return new GridH2IndexRangeRequest();
+
+            case -31:
+                return new GridH2IndexRangeResponse();
+
+            case -32:
+                return new GridH2RowMessage();
+
+            case -33:
+                return new GridH2QueryRequest();
+
+            case -34:
+                return new GridH2RowRange();
+
+            case -35:
+                return new GridH2RowRangeBounds();
         }
 
         return null;
@@ -118,7 +136,7 @@ public class GridH2ValueMessageFactory implements MessageFactory {
      * @return Filled array.
      * @throws IgniteCheckedException If failed.
      */
-    public static Value[] fillArray(Iterator<Message> src, Value[] dst, GridKernalContext ctx)
+    public static Value[] fillArray(Iterator<? extends Message> src, Value[] dst, GridKernalContext ctx)
         throws IgniteCheckedException {
         for (int i = 0; i < dst.length; i++) {
             Message msg = src.next();
@@ -134,7 +152,7 @@ public class GridH2ValueMessageFactory implements MessageFactory {
      * @return Message.
      * @throws IgniteCheckedException If failed.
      */
-    public static Message toMessage(Value v) throws IgniteCheckedException {
+    public static GridH2ValueMessage toMessage(Value v) throws IgniteCheckedException {
         switch (v.getType()) {
             case Value.NULL:
                 return GridH2Null.INSTANCE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
deleted file mode 100644
index 170ab65..0000000
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.R1;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-
-/**
- * Multithreaded reduce query tests with lots of data.
- */
-public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest {
-    /** */
-    private static final int GRID_CNT = 5;
-
-    /** */
-    private static final int TEST_TIMEOUT = 2 * 60 * 1000;
-
-    /** */
-    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** {@inheritDoc} */
-    @Override protected int gridCount() {
-        return GRID_CNT;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return TEST_TIMEOUT;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(disco);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
-        CacheConfiguration<?,?> cfg = super.cacheConfiguration(gridName);
-
-        cfg.setCacheMode(PARTITIONED);
-        cfg.setBackups(1);
-        cfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        cfg.setIndexedTypes(
-            String.class, Integer.class
-        );
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception In case of error.
-     */
-    public void testReduceQuery() throws Exception {
-        final int keyCnt = 5000;
-        final int logFreq = 500;
-
-        final GridCacheAdapter<String, Integer> c = internalCache(jcache());
-
-        final CountDownLatch startLatch = new CountDownLatch(1);
-
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() {
-            @Override public Object call() throws Exception {
-                for (int i = 1; i < keyCnt; i++) {
-                    c.getAndPut(String.valueOf(i), i);
-
-                    startLatch.countDown();
-
-                    if (i % logFreq == 0)
-                        info("Stored entries: " + i);
-                }
-
-                return null;
-            }
-        }, 1);
-
-        // Create query.
-        final CacheQuery<List<?>> sumQry = c.context().queries().
-            createSqlFieldsQuery("select _val from Integer", false).timeout(TEST_TIMEOUT);
-
-        final R1<List<?>, Integer> rmtRdc = new R1<List<?>, Integer>() {
-            /** */
-            private AtomicInteger sum = new AtomicInteger();
-
-            @Override public boolean collect(List<?> e) {
-                sum.addAndGet((Integer)e.get(0));
-
-                return true;
-            }
-
-            @Override public Integer reduce() {
-                return sum.get();
-            }
-        };
-
-        final AtomicBoolean stop = new AtomicBoolean();
-
-        startLatch.await();
-
-        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() {
-            @Override public Object call() throws Exception {
-                int cnt = 0;
-
-                while (!stop.get()) {
-                    Collection<Integer> res = sumQry.execute(rmtRdc).get();
-
-                    int sum = F.sumInt(res);
-
-                    cnt++;
-
-                    assertTrue(sum > 0);
-
-                    if (cnt % logFreq == 0) {
-                        info("Reduced value: " + sum);
-                        info("Executed queries: " + cnt);
-                    }
-                }
-
-                return null;
-            }
-        }, 1);
-
-        fut1.get();
-
-        stop.set(true);
-
-        fut2.get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index 65d479d..7c5b472 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -69,7 +69,6 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -969,22 +968,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
-    public void testEmptyObject() throws Exception {
-        IgniteCache<EmptyObject, EmptyObject> cache = ignite().cache(null);
-
-        cache.put(new EmptyObject(1), new EmptyObject(2));
-
-        for (int i = 0; i < gridCount(); i++) {
-            GridCacheQueryManager<Object, Object> qryMgr =
-                ((IgniteKernal)grid(i)).internalCache().context().queries();
-
-            assert !hasIndexTable(EmptyObject.class, qryMgr);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testPrimitiveType() throws Exception {
         IgniteCache<Integer, Integer> cache = ignite().cache(null);
 
@@ -1536,7 +1519,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
         }
 
         try {
-            IgniteCache<UUID, Person> cache = ignite().cache(null);
+            IgniteCache<UUID,Person> cache = ignite().cache(null);
 
             for (int i = 1; i <= 20; i++)
                 cache.put(UUID.randomUUID(), new Person("Person " + i, i));
@@ -1555,17 +1538,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac
     }
 
     /**
-     * @param cls Class to check index table for.
-     * @param qryMgr Query manager.
-     * @return {@code true} if index has a table for given class.
-     * @throws IgniteCheckedException If failed.
-     */
-    private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr)
-        throws IgniteCheckedException {
-        return qryMgr.size(cls) != -1;
-    }
-
-    /**
      *
      */
     private static class ArrayObject implements Serializable {

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
new file mode 100644
index 0000000..4d5a44e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Stack;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int OBJECTS = 200;
+
+    /** */
+    private static final int MAX_CACHES = 5;
+
+    /** */
+    private static Random rnd;
+
+    /** */
+    private static List<Map<Integer, Integer>> cachesData;
+
+    /** */
+    private static final List<T2<CacheMode, Integer>> MODES_1 = F.asList(
+        //new T2<>(REPLICATED, 0),
+        new T2<>(PARTITIONED, 0),
+        new T2<>(PARTITIONED, 1),
+        new T2<>(PARTITIONED, 2));
+
+    /** */
+    private static final List<T2<CacheMode, Integer>> MODES_2 = F.asList(
+        //new T2<>(REPLICATED, 0),
+        new T2<>(PARTITIONED, 0),
+        new T2<>(PARTITIONED, 1));
+
+    /** {@inheritDoc} */
+    @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initCacheAndDbData() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkAllDataEquals() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid(SRVS);
+
+        long seed = System.currentTimeMillis();
+
+        rnd = new Random(seed);
+
+        log.info("Random seed: " + seed);
+
+        cachesData = new ArrayList<>(MAX_CACHES);
+
+        for (int i = 0; i < MAX_CACHES; i++) {
+            Map<Integer, Integer> data = createData(OBJECTS * 2);
+
+            insertH2(data, i);
+
+            cachesData.add(data);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        cachesData = null;
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Statement initializeH2Schema() throws SQLException {
+        Statement st = super.initializeH2Schema();
+
+        for (int i = 0; i < MAX_CACHES; i++) {
+            st.execute("CREATE SCHEMA \"cache" + i + "\"");
+
+            st.execute("create table \"cache" + i + "\".TESTOBJECT" +
+                "  (_key int not null," +
+                "  _val other not null," +
+                "  parentId int)");
+        }
+
+        return st;
+    }
+
+    /**
+     * @param name Cache name.
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name, CacheMode cacheMode, int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setCacheMode(cacheMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        QueryEntity entity = new QueryEntity();
+        entity.setKeyType(Integer.class.getName());
+        entity.setValueType(TestObject.class.getName());
+        entity.addQueryField("parentId", Integer.class.getName(), null);
+        entity.setIndexes(F.asList(new QueryIndex("parentId")));
+
+        ccfg.setQueryEntities(F.asList(entity));
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin2Caches() throws Exception {
+        testJoin(2, MODES_1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin3Caches() throws Exception {
+        testJoin(3, MODES_1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin4Caches() throws Exception {
+        testJoin(4, MODES_2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin5Caches() throws Exception {
+        testJoin(5, MODES_2);
+    }
+
+    /**
+     * @param caches Number of caches.
+     * @param allModes Cache modes.
+     * @throws Exception If failed.
+     */
+    private void testJoin(int caches, List<T2<CacheMode, Integer>> allModes) throws Exception {
+        checkJoin(cachesData, allModes, new Stack<T2<CacheMode, Integer>>(), caches);
+    }
+
+    /**
+     * @param cachesData Caches data.
+     * @param allModes Modes to test.
+     * @param modes Select modes.
+     * @param caches Caches number.
+     * @throws Exception If failed.
+     */
+    private void checkJoin(List<Map<Integer, Integer>> cachesData,
+        List<T2<CacheMode, Integer>> allModes,
+        Stack<T2<CacheMode, Integer>> modes,
+        int caches) throws Exception {
+        if (modes.size() == caches) {
+            List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+            for (int i = 0; i < modes.size(); i++) {
+                T2<CacheMode, Integer> mode = modes.get(i);
+
+                CacheConfiguration ccfg = configuration("cache" + i, mode.get1(), mode.get2());
+
+                ccfgs.add(ccfg);
+            }
+
+            log.info("Check configurations: " + modes);
+
+            checkJoinQueries(ccfgs, cachesData);
+        }
+        else {
+            for (T2<CacheMode, Integer> mode : allModes) {
+                modes.push(mode);
+
+                checkJoin(cachesData, allModes, modes, caches);
+
+                modes.pop();
+            }
+        }
+    }
+
+    /**
+     * @param ccfgs Configurations.
+     * @param cachesData Caches data.
+     * @throws Exception If failed.
+     */
+    private void checkJoinQueries(List<CacheConfiguration> ccfgs, List<Map<Integer, Integer>> cachesData) throws Exception {
+        Ignite client = ignite(SRVS);
+
+        final int CACHES = ccfgs.size();
+
+        try {
+            IgniteCache cache = null;
+
+            boolean hasReplicated = false;
+
+            for (int i = 0; i < CACHES; i++) {
+                CacheConfiguration ccfg = ccfgs.get(i);
+
+                IgniteCache cache0 = client.createCache(ccfg);
+
+                if (ccfg.getCacheMode() == REPLICATED)
+                    hasReplicated = true;
+
+                if (cache == null && ccfg.getCacheMode() == PARTITIONED)
+                    cache = cache0;
+
+                insertCache(cachesData.get(i), cache0);
+            }
+
+            boolean distributedJoin = true;
+
+            // Do not use distributed join if all caches are REPLICATED.
+            if (cache == null) {
+                cache = client.cache(ccfgs.get(0).getName());
+
+                distributedJoin = false;
+            }
+
+            Object[] args = {};
+
+            compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, false, args, Ordering.RANDOM);
+
+            if (!hasReplicated) {
+                compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, true, args, Ordering.RANDOM);
+
+                compareQueryRes0(cache, createQuery(CACHES, true, null), distributedJoin, true, args, Ordering.RANDOM);
+            }
+
+            Map<Integer, Integer> data = cachesData.get(CACHES - 1);
+
+            final int QRY_CNT = CACHES > 4 ? 2 : 50;
+
+            int cnt = 0;
+
+            for (Integer objId : data.keySet()) {
+                compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, false, args, Ordering.RANDOM);
+
+                if (!hasReplicated) {
+                    compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, true, args, Ordering.RANDOM);
+
+                    compareQueryRes0(cache, createQuery(CACHES, true, objId), distributedJoin, true, args, Ordering.RANDOM);
+                }
+
+                if (cnt++ == QRY_CNT)
+                    break;
+            }
+        }
+        finally {
+            for (CacheConfiguration ccfg : ccfgs)
+                client.destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param caches Number of caches to join.
+     * @param outer If {@code true} creates outer join query, otherwise inner join.
+     * @param objId Object ID.
+     * @return SQL.
+     */
+    @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+    private String createQuery(int caches, boolean outer, @Nullable Integer objId) {
+        StringBuilder qry = new StringBuilder("select ");
+
+        for (int i = 0; i < caches; i++) {
+            if (i != 0)
+                qry.append(", ");
+
+            qry.append("o" + i + "._key");
+        }
+
+        qry.append(" from \"cache0\".TestObject o0 ");
+
+        for (int i = 1; i < caches; i++) {
+            String cacheName = "cache" + i;
+
+            String cur = "o" + i;
+            String prev = "o" + (i - 1);
+
+            qry.append(outer ? "left outer join " : "inner join ");
+            qry.append("\"" + cacheName + "\".TestObject " + cur);
+
+            if (i == caches - 1 && objId != null)
+                qry.append(" on (" + prev + ".parentId=" + cur + "._key and " + cur + "._key=" + objId + ") ");
+            else
+                qry.append(" on (" + prev + ".parentId=" + cur + "._key) ");
+        }
+
+        return qry.toString();
+    }
+
+    /**
+     * @param data Data.
+     * @param cache Cache.
+     */
+    private void insertCache(Map<Integer, Integer> data, IgniteCache<Object, Object> cache) {
+        for (Map.Entry<Integer, Integer> e : data.entrySet())
+            cache.put(e.getKey(), new TestObject(e.getValue()));
+    }
+
+    /**
+     * @param data Data.
+     * @param cache Cache index.
+     * @throws Exception If failed.
+     */
+    private void insertH2(Map<Integer, Integer> data, int cache) throws Exception {
+        for (Map.Entry<Integer, Integer> e : data.entrySet()) {
+            try (PreparedStatement st = conn.prepareStatement("insert into \"cache" + cache + "\".TESTOBJECT " +
+                "(_key, _val, parentId) values(?, ?, ?)")) {
+                st.setObject(1, e.getKey());
+                st.setObject(2, new TestObject(e.getValue()));
+                st.setObject(3, e.getValue());
+
+                st.executeUpdate();
+            }
+        }
+    }
+
+    /**
+     * @param cnt Objects count.
+     * @return Generated data.
+     */
+    private Map<Integer, Integer> createData(int cnt) {
+        Map<Integer, Integer> res = new LinkedHashMap<>();
+
+        while (res.size() < cnt)
+            res.put(rnd.nextInt(cnt), rnd.nextInt(OBJECTS + 1));
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    static class TestObject implements Serializable {
+        /** */
+        int parentId;
+
+        /**
+         * @param parentId Parent object ID.
+         */
+        public TestObject(int parentId) {
+            this.parentId = parentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestObject.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
new file mode 100644
index 0000000..7881c44
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private static final String ACCOUNT_CACHE = "acc";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(PersonKey.class.getName(), "affKey");
+
+        cfg.setCacheKeyConfiguration(keyCfg);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(PersonKey.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("id", Integer.class.getName(), null);
+            entity.addQueryField("affKey", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("name", String.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("name")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ACCOUNT_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Account.class.getName());
+            entity.addQueryField("personId", Integer.class.getName(), null);
+            entity.addQueryField("name", String.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("personId"), new QueryIndex("name")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(1);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(2);
+
+        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+        IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        AtomicInteger orgKey = new AtomicInteger();
+        AtomicInteger accKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        /**
+         * One organization, one person, two accounts.
+         */
+
+        int orgId1 = keyForNode(aff, orgKey, node0);
+
+        orgCache.put(orgId1, new Organization("obj-" + orgId1));
+
+        personCache.put(new PersonKey(1, orgId1), new Person(1, "o1-p1"));
+        personCache.put(new PersonKey(2, orgId1), new Person(2, "o1-p2"));
+
+        accCache.put(keyForNode(aff, accKey, node0), new Account(1, "a0"));
+        accCache.put(keyForNode(aff, accKey, node1), new Account(1, "a1"));
+
+        // Join on affinity keys equals condition should not be distributed.
+        String qry = "select o.name, p._key, p.name " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p.affKey = o._key";
+
+        assertFalse(plan(qry, orgCache, false).contains("batched"));
+
+        checkQuery(qry, orgCache, false, 2);
+
+        checkQuery("select o.name, p._key, p.name, a.name " +
+            "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " +
+            "where p.affKey = o._key and p.id = a.personId", orgCache, true, 2);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @return Query plan.
+     */
+    private String plan(String sql,
+        IgniteCache<?, ?> cache,
+        boolean enforceJoinOrder) {
+        return (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param expSize Expected results size.
+     */
+    private void checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        boolean enforceJoinOrder,
+        int expSize) {
+        String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+            .setDistributedJoins(true)
+            .setEnforceJoinOrder(enforceJoinOrder))
+            .getAll().get(0).get(0);
+
+        log.info("Plan: " + plan);
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+    }
+    /**
+     *
+     */
+    public static class PersonKey {
+        /** */
+        private int id;
+
+        /** */
+        @AffinityKeyMapped
+        private int affKey;
+
+        /**
+         * @param id Key.
+         * @param affKey Affinity key.
+         */
+        public PersonKey(int id, int affKey) {
+            this.id = id;
+            this.affKey = affKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            PersonKey other = (PersonKey)o;
+
+            return id == other.id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Account implements Serializable {
+        /** */
+        int personId;
+
+        /** */
+        String name;
+
+        /**
+         * @param personId Person ID.
+         * @param name Name.
+         */
+        public Account(int personId, String name) {
+            this.personId = personId;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Account.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int id;
+
+        /** */
+        String name;
+
+        /**
+         * @param id Person ID.
+         * @param name Name.
+         */
+        public Person(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java
new file mode 100644
index 0000000..1d4f7b2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinCustomAffinityMapper extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String PERSON_CACHE_CUSTOM_AFF = "personCustomAff";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private static final String ORG_CACHE_REPL_CUSTOM = "orgReplCustomAff";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("orgId")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE_CUSTOM_AFF);
+
+            ccfg.setAffinityMapper(new TestMapper());
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.setIndexes(F.asList(new QueryIndex("orgId")));
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE_REPL_CUSTOM);
+
+            ccfg.setCacheMode(REPLICATED);
+            ccfg.setAffinityMapper(new TestMapper());
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(0);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinCustomAffinityMapper() throws Exception {
+        Ignite ignite = ignite(0);
+
+        IgniteCache<Object, Object> cache = ignite.cache(PERSON_CACHE);
+
+        checkQueryFails(cache, "select o._key k1, p._key k2 " +
+            "from \"org\".Organization o, \"personCustomAff\".Person p where o._key=p.orgId", false);
+
+        checkQueryFails(cache, "select o._key k1, p._key k2 " +
+            "from \"personCustomAff\".Person p, \"org\".Organization o where o._key=p.orgId", false);
+
+        {
+            // Check regular query does not fail.
+            SqlFieldsQuery qry = new SqlFieldsQuery("select o._key k1, p._key k2 " +
+                "from \"org\".Organization o, \"personCustomAff\".Person p where o._key=p.orgId");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+            // Should not check affinity for replicated cache.
+            SqlFieldsQuery qry = new SqlFieldsQuery("select o1._key k1, p._key k2, o2._key k3 " +
+                "from \"org\".Organization o1, \"person\".Person p, \"orgReplCustomAff\".Organization o2 where " +
+                "o1._key=p.orgId and o2._key=p.orgId");
+
+            cache.query(qry).getAll();
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param sql SQL.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    private void checkQueryFails(final IgniteCache<Object, Object> cache,
+        String sql,
+        boolean enforceJoinOrder) {
+        final SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setEnforceJoinOrder(enforceJoinOrder);
+
+        Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                cache.query(qry);
+
+                return null;
+            }
+        }, CacheException.class, null);
+
+        assertTrue("Unexpected error message: " + err.getMessage(),
+            err.getMessage().contains("can not use distributed joins for cache with custom AffinityKeyMapper configured."));
+    }
+
+    /**
+     *
+     */
+    static class TestMapper implements AffinityKeyMapper {
+        /** {@inheritDoc} */
+        @Override public Object affinityKey(Object key) {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void reset() {
+            // No-op.
+        }
+    }
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /**
+         * @param orgId Organization ID.
+         */
+        public Person(int orgId) {
+            this.orgId = orgId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
new file mode 100644
index 0000000..95c56fa
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinNoIndexTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String PERSON_CACHE = "person";
+
+    /** */
+    private static final String ORG_CACHE = "org";
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+        spi.setIpFinder(IP_FINDER);
+
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        {
+            CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Person.class.getName());
+            entity.addQueryField("orgId", Integer.class.getName(), null);
+            entity.addQueryField("orgName", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        {
+            CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+            QueryEntity entity = new QueryEntity();
+            entity.setKeyType(Integer.class.getName());
+            entity.setValueType(Organization.class.getName());
+            entity.addQueryField("name", String.class.getName(), null);
+
+            ccfg.setQueryEntities(F.asList(entity));
+
+            ccfgs.add(ccfg);
+        }
+
+        cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration configuration(String name) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(name);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setBackups(0);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        startGrid(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoin() throws Exception {
+        Ignite client = grid(2);
+
+        Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+        final IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+        AtomicInteger pKey = new AtomicInteger(100_000);
+        AtomicInteger orgKey = new AtomicInteger();
+
+        ClusterNode node0 = ignite(0).cluster().localNode();
+        ClusterNode node1 = ignite(1).cluster().localNode();
+
+        for (int i = 0; i < 3; i++) {
+            int orgId = keyForNode(aff, orgKey, node0);
+
+            orgCache.put(orgId, new Organization("org-" + i));
+
+            for (int j = 0; j < i; j++)
+                personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i));
+        }
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p.orgName = o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o inner join \"person\".Person p " +
+            "on p.orgName = o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p.orgName > o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from (select * from \"org\".Organization) o, \"person\".Person p " +
+            "where p.orgName = o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, (select * from \"person\".Person) p " +
+            "where p.orgName = o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from (select * from \"org\".Organization) o, (select * from \"person\".Person) p " +
+            "where p.orgName = o.name");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, \"person\".Person p");
+
+        checkNoIndexError(personCache, "select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, \"person\".Person p where o._key != p._key");
+
+        checkQuery("select o.name, p._key, p.orgName " +
+            "from \"org\".Organization o, \"person\".Person p " +
+            "where p._key = o._key and o.name=?", personCache, 0, "aaa");
+    }
+
+    /**
+     * @param cache Cache.
+     * @param sql SQL.
+     */
+    private void checkNoIndexError(final IgniteCache<Object, Object> cache, final String sql) {
+        Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+                qry.setDistributedJoins(true);
+
+                cache.query(qry).getAll();
+
+                return null;
+            }
+        }, CacheException.class, null);
+
+        log.info("Error: " + err.getMessage());
+
+        assertTrue("Unexpected error message: " + err.getMessage(),
+            err.getMessage().contains("join condition does not use index"));
+    }
+
+    /**
+     * @param sql SQL.
+     * @param cache Cache.
+     * @param expSize Expected results size.
+     * @param args Arguments.
+     * @return Results.
+     */
+    private List<List<?>> checkQuery(String sql,
+        IgniteCache<Object, Object> cache,
+        int expSize,
+        Object... args) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        qry.setDistributedJoins(true);
+        qry.setArgs(args);
+
+        log.info("Plan: " + queryPlan(cache, qry));
+
+        QueryCursor<List<?>> cur = cache.query(qry);
+
+        List<List<?>> res = cur.getAll();
+
+        if (expSize != res.size())
+            log.info("Results: " + res);
+
+        assertEquals(expSize, res.size());
+
+        return res;
+    }
+
+    /**
+     *
+     */
+    private static class Person implements Serializable {
+        /** */
+        int orgId;
+
+        /** */
+        String orgName;
+
+        /**
+         * @param orgId Organization ID.
+         * @param orgName Organization name.
+         */
+        public Person(int orgId, String orgName) {
+            this.orgId = orgId;
+            this.orgName = orgName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class Organization implements Serializable {
+        /** */
+        String name;
+
+        /**
+         * @param name Name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Organization.class, this);
+        }
+    }
+}


Mime
View raw message