ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [09/50] [abbrv] ignite git commit: ignite-3860 - merge to 1.9 # Conflicts: # modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java # modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/
Date Wed, 01 Mar 2017 14:32:52 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
index 49c679d..57ca4df 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
@@ -36,9 +36,6 @@ public class GridSqlTable extends GridSqlElement {
     /** */
     private final GridH2Table tbl;
 
-    /** */
-    private boolean affKeyCond;
-
     /**
      * @param schema Schema.
      * @param tblName Table name.
@@ -60,7 +57,7 @@ public class GridSqlTable extends GridSqlElement {
      * @param tbl H2 Table.
      */
     private GridSqlTable(@Nullable String schema, String tblName, @Nullable Table tbl) {
-        super(Collections.<GridSqlElement>emptyList());
+        super(Collections.<GridSqlAst>emptyList());
 
         assert schema != null : "schema";
         assert tblName != null : "tblName";
@@ -71,20 +68,6 @@ public class GridSqlTable extends GridSqlElement {
         this.tbl = tbl instanceof GridH2Table ? (GridH2Table)tbl : null;
     }
 
-    /**
-     * @param affKeyCond If affinity key condition is found.
-     */
-    public void affinityKeyCondition(boolean affKeyCond) {
-        this.affKeyCond = affKeyCond;
-    }
-
-    /**
-     * @return {@code true} If affinity key condition is found.
-     */
-    public boolean affinityKeyCondition() {
-        return affKeyCond;
-    }
-
     /** {@inheritDoc} */
     @Override public String getSQL() {
         if (schema == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
index efe9138..b4a610c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlType.java
@@ -48,6 +48,10 @@ public final class GridSqlType {
     public static final GridSqlType BOOLEAN = new GridSqlType(Value.BOOLEAN, 0, ValueBoolean.PRECISION,
         ValueBoolean.DISPLAY_SIZE, "BOOLEAN");
 
+    /** */
+    public static final GridSqlType RESULT_SET = new GridSqlType(Value.RESULT_SET, 0,
+        Integer.MAX_VALUE, Integer.MAX_VALUE, "");
+
     /** H2 type. */
     private final int type;
 
@@ -71,7 +75,7 @@ public final class GridSqlType {
      * @param sql SQL definition of the type.
      */
     private GridSqlType(int type, int scale, long precision, int displaySize, String sql) {
-        assert !F.isEmpty(sql) || type == Value.UNKNOWN;
+        assert !F.isEmpty(sql) || type == Value.UNKNOWN || type == Value.RESULT_SET;
 
         this.type = type;
         this.scale = scale;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
index b11278e..09f0e24 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlUnion.java
@@ -26,6 +26,12 @@ import org.h2.util.StatementBuilder;
  */
 public class GridSqlUnion extends GridSqlQuery {
     /** */
+    public static final int LEFT_CHILD = 2;
+
+    /** */
+    public static final int RIGHT_CHILD = 3;
+
+    /** */
     private int unionType;
 
     /** */
@@ -35,6 +41,57 @@ public class GridSqlUnion extends GridSqlQuery {
     private GridSqlQuery left;
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <E extends GridSqlAst> E child(int childIdx) {
+        if (childIdx < LEFT_CHILD)
+            return super.child(childIdx);
+
+        switch (childIdx) {
+            case LEFT_CHILD:
+                assert left != null;
+
+                return (E)left;
+
+            case RIGHT_CHILD:
+                assert right != null;
+
+                return (E)right;
+
+            default:
+                throw new IllegalStateException("Child index: " + childIdx);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <E extends GridSqlAst> void child(int childIdx, E child) {
+        if (childIdx < LEFT_CHILD) {
+            super.child(childIdx, child);
+
+            return;
+        }
+
+        switch (childIdx) {
+            case LEFT_CHILD:
+                left = (GridSqlQuery)child;
+
+                break;
+
+            case RIGHT_CHILD:
+                right = (GridSqlQuery)child;
+
+                break;
+
+            default:
+                throw new IllegalStateException("Child index: " + childIdx);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return 4; // OFFSET + LIMIT + LEFT + RIGHT
+    }
+
+    /** {@inheritDoc} */
     @Override protected int visibleColumns() {
         return left.visibleColumns();
     }
@@ -50,7 +107,7 @@ public class GridSqlUnion extends GridSqlQuery {
 
         buff.append('(').append(left.getSQL()).append(')');
 
-        switch (unionType) {
+        switch (unionType()) {
             case SelectUnion.UNION_ALL:
                 buff.append("\nUNION ALL\n");
                 break;
@@ -78,6 +135,13 @@ public class GridSqlUnion extends GridSqlQuery {
         return buff.toString();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean simpleQuery() {
+        return unionType() == SelectUnion.UNION_ALL && sort().isEmpty() &&
+            offset() == null && limit() == null &&
+            left().simpleQuery() && right().simpleQuery();
+    }
+
     /**
      * @return Union type.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java
deleted file mode 100644
index 665268c..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlValue.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.sql;
-
-/**
- * Marker interface for a simple value.
- */
-public interface GridSqlValue {
-    // No-op.
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..b0fa639 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -84,6 +84,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
 import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
@@ -435,6 +436,7 @@ public class GridMapQueryExecutor {
             null,
             req.pageSize(),
             false,
+            true,
             req.timeout());
     }
 
@@ -456,6 +458,7 @@ public class GridMapQueryExecutor {
             req.tables(),
             req.pageSize(),
             req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS),
+            req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
             req.timeout());
     }
 
@@ -482,6 +485,7 @@ public class GridMapQueryExecutor {
         Collection<String> tbls,
         int pageSize,
         boolean distributedJoins,
+        boolean enforceJoinOrder,
         int timeout
     ) {
         // Prepare to run queries.
@@ -541,8 +545,7 @@ public class GridMapQueryExecutor {
 
             Connection conn = h2.connectionForSpace(mainCctx.name());
 
-            // Here we enforce join order to have the same behavior on all the nodes.
-            h2.setupConnection(conn, distributedJoins, true);
+            setupConnection(conn, distributedJoins, enforceJoinOrder);
 
             GridH2QueryContext.set(qctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index c267f4a..de14771 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -17,21 +17,25 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.ConcurrentModificationException;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
@@ -43,9 +47,13 @@ import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
+import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyIterator;
+import static java.util.Objects.requireNonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 
 /**
@@ -55,6 +63,27 @@ public abstract class GridMergeIndex extends BaseIndex {
     /** */
     private static final int MAX_FETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_MAX_SIZE, 10_000);
 
+    /** */
+    private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024);
+
+    /** */
+    protected final Comparator<SearchRow> firstRowCmp = new Comparator<SearchRow>() {
+        @Override public int compare(SearchRow rowInList, SearchRow searchRow) {
+            int res = compareRows(rowInList, searchRow);
+
+            return res == 0 ? 1 : res;
+        }
+    };
+
+    /** */
+    protected final Comparator<SearchRow> lastRowCmp = new Comparator<SearchRow>() {
+        @Override public int compare(SearchRow rowInList, SearchRow searchRow) {
+            int res = compareRows(rowInList, searchRow);
+
+            return res == 0 ? -1 : res;
+        }
+    };
+
     /** All rows number. */
     private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
@@ -67,10 +96,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * Will be r/w from query execution thread only, does not need to be threadsafe.
      */
-    private ArrayList<Row> fetched = new ArrayList<>();
+    private final BlockList<Row> fetched;
 
     /** */
-    private int fetchedCnt;
+    private Row lastEvictedRow;
+
+    /** */
+    private volatile int fetchedCnt;
 
     /** */
     private final GridKernalContext ctx;
@@ -86,8 +118,9 @@ public abstract class GridMergeIndex extends BaseIndex {
         GridMergeTable tbl,
         String name,
         IndexType type,
-        IndexColumn[] cols) {
-        this.ctx = ctx;
+        IndexColumn[] cols
+    ) {
+        this(ctx);
 
         initBaseIndex(tbl, 0, name, cols, type);
     }
@@ -96,7 +129,19 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param ctx Context.
      */
     protected GridMergeIndex(GridKernalContext ctx) {
+        if (!U.isPow2(PREFETCH_SIZE)) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be positive and a power of 2.");
+        }
+
+        if (PREFETCH_SIZE >= MAX_FETCH_SIZE) {
+            throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE +
+                ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ").");
+        }
+
         this.ctx = ctx;
+
+        fetched = new BlockList<>(PREFETCH_SIZE);
     }
 
     /**
@@ -109,7 +154,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     /**
      * Fails index if any source node is left.
      */
-    protected final void checkSourceNodesAlive() {
+    private void checkSourceNodesAlive() {
         for (UUID nodeId : sources()) {
             if (!ctx.discovery().alive(nodeId)) {
                 fail(nodeId, null);
@@ -154,6 +199,50 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param queue Queue to poll.
+     * @return Next page.
+     */
+    private GridResultPage takeNextPage(BlockingQueue<GridResultPage> queue) {
+        GridResultPage page;
+
+        for (;;) {
+            try {
+                page = queue.poll(500, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e) {
+                throw new CacheException("Query execution was interrupted.", e);
+            }
+
+            if (page != null)
+                break;
+
+            checkSourceNodesAlive();
+        }
+
+        return page;
+    }
+
+    /**
+     * @param queue Queue to poll.
+     * @param iter Current iterator.
+     * @return The same or new iterator.
+     */
+    protected final Iterator<Value[]> pollNextIterator(BlockingQueue<GridResultPage> queue, Iterator<Value[]> iter) {
+        while (!iter.hasNext()) {
+            GridResultPage page = takeNextPage(queue);
+
+            if (page.isLast())
+                return emptyIterator(); // We are done.
+
+            fetchNextPage(page);
+
+            iter = page.rows();
+        }
+
+        return iter;
+    }
+
+    /**
      * @param e Error.
      */
     public void fail(final CacheException e) {
@@ -261,9 +350,8 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor find(Session ses, SearchRow first, SearchRow last) {
-        if (fetched == null)
-            throw new IgniteException("Fetched result set was too large.");
+    @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) {
+        checkBounds(lastEvictedRow, first, last);
 
         if (fetchedAll())
             return findAllFetched(fetched, first, last);
@@ -279,16 +367,26 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param first First row.
-     * @param last Last row.
+     * @param lastEvictedRow Last evicted fetched row.
+     * @param first Lower bound.
+     * @param last Upper bound.
+     */
+    protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
+        if (lastEvictedRow != null)
+            throw new IgniteException("Fetched result set was too large.");
+    }
+
+    /**
+     * @param first Lower bound.
+     * @param last Upper bound.
      * @return Cursor. Usually it must be {@link FetchingCursor} instance.
      */
     protected abstract Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last);
 
     /**
      * @param fetched Fetched rows.
-     * @param first First row.
-     * @param last Last row.
+     * @param first Lower bound.
+     * @param last Upper bound.
      * @return Cursor.
      */
     protected abstract Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last);
@@ -349,69 +447,212 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param rows Sorted rows list.
+     * @param searchRow Search row.
+     * @param cmp Comparator.
+     * @param checkLast If we need to optimistically check the last row right away.
+     * @return Insertion point for the search row.
+     */
+    protected static int binarySearchRow(
+        List<Row> rows,
+        SearchRow searchRow,
+        Comparator<SearchRow> cmp,
+        boolean checkLast
+    ) {
+        assert !rows.isEmpty();
+
+        // Optimistically compare with the last row as a first step.
+        if (checkLast) {
+            int res = cmp.compare(last(rows), searchRow);
+
+            assert res != 0; // Comparators must never return 0 here.
+
+            if (res < 0)
+                return rows.size(); // The search row is greater than the last row.
+        }
+
+        int res = Collections.binarySearch(rows, searchRow, cmp);
+
+        assert res < 0: res; // Comparator must never return 0.
+
+        return -res - 1;
+    }
+
+    /**
+     * @param evictedBlock Evicted block.
+     */
+    private void onBlockEvict(List<Row> evictedBlock) {
+        assert evictedBlock.size() == PREFETCH_SIZE;
+
+        // Remember the last row (it will be max row) from the evicted block.
+        lastEvictedRow = requireNonNull(last(evictedBlock));
+    }
+
+    /**
+     * @param l List.
+     * @return Last element.
+     */
+    private static <Z> Z last(List<Z> l) {
+        return l.get(l.size() - 1);
+    }
+
+    /**
      * Fetching cursor.
      */
-    protected class FetchingCursor extends GridH2Cursor {
+    protected class FetchingCursor implements Cursor {
         /** */
-        private Iterator<Row> stream;
+        Iterator<Row> stream;
+
+        /** */
+        List<Row> rows;
+
+        /** */
+        int cur;
+
+        /** */
+        SearchRow first;
+
+        /** */
+        SearchRow last;
+
+        /** */
+        int lastFound = Integer.MAX_VALUE;
 
         /**
-         * @param stream Iterator.
+         * @param first Lower bound.
+         * @param last Upper bound.
+         * @param stream Stream of all the rows from remote nodes.
          */
-        public FetchingCursor(Iterator<Row> stream) {
-            super(new FetchedIterator());
-
+        public FetchingCursor(SearchRow first, SearchRow last, Iterator<Row> stream) {
             assert stream != null;
 
+            // Initially we will use all the fetched rows, after we will switch to the last block.
+            rows = fetched;
+
             this.stream = stream;
+            this.first = first;
+            this.last = last;
+
+            if (haveBounds() && !rows.isEmpty())
+                cur = findBounds();
+
+            cur--; // Set current position before the first row.
         }
 
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (super.next()) {
-                assert cur != null;
-
-                if (iter == stream && fetched != null) { // Cache fetched rows for reuse.
-                    if (fetched.size() == MAX_FETCH_SIZE)
-                        fetched = null; // Throw away fetched result if it is too large.
-                    else
-                        fetched.add(cur);
-                }
+        /**
+         * @return {@code true} If we have bounds.
+         */
+        private boolean haveBounds() {
+            return first != null || last != null;
+        }
 
-                fetchedCnt++;
+        /**
+         * @return Lower bound.
+         */
+        private int findBounds() {
+            assert !rows.isEmpty(): "rows";
 
-                return true;
+            int firstFound = cur;
+
+            // Find the lower bound.
+            if (first != null) {
+                firstFound = binarySearchRow(rows, first, firstRowCmp, true);
+
+                assert firstFound >= cur && firstFound <= rows.size(): "firstFound";
+
+                if (firstFound == rows.size())
+                    return firstFound; // The lower bound is greater than all the rows we have.
+
+                first = null; // We have found the lower bound, do not need it anymore.
             }
 
-            if (iter == stream) // We've fetched the stream.
-                return false;
+            // Find the upper bound.
+            if (last != null) {
+                assert lastFound == Integer.MAX_VALUE: "lastFound";
+
+                int lastFound0 = binarySearchRow(rows, last, lastRowCmp, true);
 
-            iter = stream; // Switch from cached to stream.
+                // If the upper bound is too large we will ignore it.
+                if (lastFound0 != rows.size())
+                    lastFound = lastFound0;
+            }
 
-            return next();
+            return firstFound;
         }
-    }
 
-    /**
-     * List iterator without {@link ConcurrentModificationException}.
-     */
-    private class FetchedIterator implements Iterator<Row> {
-        /** */
-        private int idx;
+        /**
+         * Fetch rows from the stream.
+         */
+        private void fetchRows() {
+            for (;;) {
+                // Take the current last block and set the position after last.
+                rows = fetched.lastBlock();
+                cur = rows.size();
+
+                // Fetch stream.
+                while (stream.hasNext()) {
+                    fetched.add(requireNonNull(stream.next()));
+
+                    // Evict block if we've fetched too many rows.
+                    if (fetched.size() == MAX_FETCH_SIZE) {
+                        onBlockEvict(fetched.evictFirstBlock());
+
+                        assert fetched.size() < MAX_FETCH_SIZE;
+                    }
+
+                    // No bounds -> no need to do binary search, can return the fetched row right away.
+                    if (!haveBounds())
+                        break;
+
+                    // When the last block changed, it means that we've filled the current last block.
+                    // We have fetched the needed number of rows for binary search.
+                    if (fetched.lastBlock() != rows) {
+                        assert fetched.lastBlock().isEmpty(); // The last row must be added to the previous block.
+
+                        break;
+                    }
+                }
+
+                if (cur == rows.size())
+                    cur = Integer.MAX_VALUE; // We were not able to fetch anything. Done.
+                else {
+                    // Update fetched count.
+                    fetchedCnt += rows.size() - cur;
+
+                    if (haveBounds()) {
+                        cur = findBounds();
+
+                        if (cur == rows.size())
+                            continue; // The lower bound is too large, continue fetching rows.
+                    }
+                }
+
+                return;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (++cur == rows.size())
+                fetchRows();
+
+            return cur < lastFound;
+        }
 
         /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return fetched != null && idx < fetched.size();
+        @Override public Row get() {
+            return rows.get(cur);
         }
 
         /** {@inheritDoc} */
-        @Override public Row next() {
-            return fetched.get(idx++);
+        @Override public SearchRow getSearchRow() {
+            return get();
         }
 
         /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
+        @Override public boolean previous() {
+            // Should never be called.
+            throw DbException.getUnsupportedException("previous");
         }
     }
 
@@ -427,4 +668,81 @@ public abstract class GridMergeIndex extends BaseIndex {
         /** */
         volatile State state = State.UNINITIALIZED;
     }
+
+    /**
+     */
+    private static final class BlockList<Z> extends AbstractList<Z> implements RandomAccess {
+        /** */
+        private final List<List<Z>> blocks;
+
+        /** */
+        private int size;
+
+        /** */
+        private final int maxBlockSize;
+
+        /** */
+        private final int shift;
+
+        /** */
+        private final int mask;
+
+        /**
+         * @param maxBlockSize Max block size.
+         */
+        private BlockList(int maxBlockSize) {
+            assert U.isPow2(maxBlockSize);
+
+            this.maxBlockSize = maxBlockSize;
+
+            shift = Integer.numberOfTrailingZeros(maxBlockSize);
+            mask = maxBlockSize - 1;
+
+            blocks = new ArrayList<>();
+            blocks.add(new ArrayList<Z>());
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return size;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean add(Z z) {
+            size++;
+
+            List<Z> lastBlock = lastBlock();
+
+            lastBlock.add(z);
+
+            if (lastBlock.size() == maxBlockSize)
+                blocks.add(new ArrayList<Z>());
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Z get(int idx) {
+            return blocks.get(idx >>> shift).get(idx & mask);
+        }
+
+        /**
+         * @return Last block.
+         */
+        private List<Z> lastBlock() {
+            return last(blocks);
+        }
+
+        /**
+         * @return Evicted block.
+         */
+        private List<Z> evictFirstBlock() {
+            // Remove head block.
+            List<Z> res = blocks.remove(0);
+
+            size -= res.size();
+
+            return res;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
new file mode 100644
index 0000000..a1b6691
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.index.Cursor;
+import org.h2.index.IndexType;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.table.IndexColumn;
+import org.h2.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase.bubbleUp;
+
+/**
+ * Sorted index.
+ */
+public final class GridMergeIndexSorted extends GridMergeIndex {
+    /** */
+    private final Comparator<RowStream> streamCmp = new Comparator<RowStream>() {
+        @Override public int compare(RowStream o1, RowStream o2) {
+            // Nulls at the beginning.
+            if (o1 == null)
+                return -1;
+
+            if (o2 == null)
+                return 1;
+
+            return compareRows(o1.get(), o2.get());
+        }
+    };
+
+    /** */
+    private Map<UUID,RowStream> streamsMap;
+
+    /** */
+    private RowStream[] streams;
+
+    /**
+     * @param ctx Kernal context.
+     * @param tbl Table.
+     * @param name Index name,
+     * @param type Index type.
+     * @param cols Columns.
+     */
+    public GridMergeIndexSorted(
+        GridKernalContext ctx,
+        GridMergeTable tbl,
+        String name,
+        IndexType type,
+        IndexColumn[] cols
+    ) {
+        super(ctx, tbl, name, type, cols);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSources(Collection<ClusterNode> nodes) {
+        super.setSources(nodes);
+
+        streamsMap = U.newHashMap(nodes.size());
+        streams = new RowStream[nodes.size()];
+
+        int i = 0;
+
+        for (ClusterNode node : nodes) {
+            RowStream stream = new RowStream(node.id());
+
+            streams[i] = stream;
+
+            if (streamsMap.put(stream.src, stream) != null)
+                throw new IllegalStateException();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void addPage0(GridResultPage page) {
+        if (page.isLast() || page.isFail()) {
+            // Finish all the streams.
+            for (RowStream stream : streams)
+                stream.addPage(page);
+        }
+        else {
+            assert page.rowsInPage() > 0;
+
+            UUID src = page.source();
+
+            streamsMap.get(src).addPage(page);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkBounds(Row lastEvictedRow, SearchRow first, SearchRow last) {
+        // If our last evicted fetched row was smaller than the given lower bound,
+        // then we are ok. This is important for merge join to work.
+        if (lastEvictedRow != null && first != null && compareRows(lastEvictedRow, first) < 0)
+            return;
+
+        super.checkBounds(lastEvictedRow, first, last);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findAllFetched(List<Row> fetched, SearchRow first, SearchRow last) {
+        Iterator<Row> iter;
+
+        if (fetched.isEmpty())
+            iter = emptyIterator();
+        else if (first == null && last == null)
+            iter = fetched.iterator();
+        else {
+            int low = first == null ? 0 : binarySearchRow(fetched, first, firstRowCmp, false);
+
+            if (low == fetched.size())
+                iter = emptyIterator();
+            else {
+                int high = last == null ? fetched.size() : binarySearchRow(fetched, last, lastRowCmp, false);
+
+                iter = fetched.subList(low, high).iterator();
+            }
+        }
+
+        return new GridH2Cursor(iter);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
+        return new FetchingCursor(first, last, new MergeStreamIterator());
+    }
+
+    /**
+     * Iterator merging multiple row streams.
+     */
+    private final class MergeStreamIterator implements Iterator<Row> {
+        /** */
+        private boolean first = true;
+
+        /** */
+        private int off;
+
+        /** */
+        private boolean hasNext;
+
+        /**
+         *
+         */
+        private void goFirst() {
+            for (int i = 0; i < streams.length; i++) {
+                if (!streams[i].next()) {
+                    streams[i] = null;
+                    off++; // Move left bound.
+                }
+            }
+
+            if (off < streams.length)
+                Arrays.sort(streams, streamCmp);
+
+            first = false;
+        }
+
+        /**
+         *
+         */
+        private void goNext() {
+            if (streams[off].next())
+                bubbleUp(streams, off, streamCmp);
+            else
+                streams[off++] = null; // Move left bound and nullify empty stream.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            if (hasNext)
+                return true;
+
+            if (first)
+                goFirst();
+            else
+                goNext();
+
+            return hasNext = off < streams.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            hasNext = false;
+
+            return streams[off].get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * Row stream.
+     */
+    private final class RowStream {
+        /** */
+        final UUID src;
+
+        /** */
+        final BlockingQueue<GridResultPage> queue = new ArrayBlockingQueue<>(8);
+
+        /** */
+        Iterator<Value[]> iter = emptyIterator();
+
+        /** */
+        Row cur;
+
+        /**
+         * @param src Source.
+         */
+        private RowStream(UUID src) {
+            this.src = src;
+        }
+
+        /**
+         * @param page Page.
+         */
+        private void addPage(GridResultPage page) {
+            queue.offer(page);
+        }
+
+        /**
+         * @return {@code true} If we successfully switched to the next row.
+         */
+        private boolean next() {
+            cur = null;
+
+            iter = pollNextIterator(queue, iter);
+
+            if (!iter.hasNext())
+                return false;
+
+            cur = GridH2RowFactory.create(iter.next());
+
+            return true;
+        }
+
+        /**
+         * @return Current row.
+         */
+        private Row get() {
+            assert cur != null;
+
+            return cur;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 8a8577f..b69c898 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -22,8 +22,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.cache.CacheException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory;
@@ -33,12 +31,11 @@ import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.value.Value;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Unsorted merge index.
  */
-public class GridMergeIndexUnsorted extends GridMergeIndex {
+public final class GridMergeIndexUnsorted extends GridMergeIndex {
     /** */
     private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>();
 
@@ -74,43 +71,22 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
-    @Override protected Cursor findAllFetched(List<Row> fetched, @Nullable SearchRow first, @Nullable SearchRow last) {
+    @Override protected Cursor findAllFetched(List<Row> fetched, SearchRow first, SearchRow last) {
+        // This index is unsorted: have to ignore bounds.
         return new GridH2Cursor(fetched.iterator());
     }
 
     /** {@inheritDoc} */
-    @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) {
-        return new FetchingCursor(new Iterator<Row>() {
+    @Override protected Cursor findInStream(SearchRow first, SearchRow last) {
+        // This index is unsorted: have to ignore bounds.
+        return new FetchingCursor(null, null, new Iterator<Row>() {
             /** */
             Iterator<Value[]> iter = Collections.emptyIterator();
 
             @Override public boolean hasNext() {
-                while (!iter.hasNext()) {
-                    GridResultPage page;
+                iter = pollNextIterator(queue, iter);
 
-                    for (;;) {
-                        try {
-                            page = queue.poll(500, TimeUnit.MILLISECONDS);
-                        }
-                        catch (InterruptedException e) {
-                            throw new CacheException("Query execution was interrupted.", e);
-                        }
-
-                        if (page != null)
-                            break;
-
-                        checkSourceNodesAlive();
-                    }
-
-                    if (page.isLast())
-                        return false; // We are done.
-
-                    fetchNextPage(page);
-
-                    iter = page.rows();
-                }
-
-                return true;
+                return iter.hasNext();
             }
 
             @Override public Row next() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f54fab6..128ca8e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -67,7 +67,6 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
@@ -101,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
 import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
+import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter.mergeTableIdentifier;
 
 /**
  * Reduce query executor.
@@ -589,7 +589,8 @@ public class GridReduceQueryExecutor {
                     mapQrys = new ArrayList<>(qry.mapQueries().size());
 
                     for (GridCacheSqlQuery mapQry : qry.mapQueries())
-                        mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query(), mapQry.parameters()));
+                        mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query())
+                            .parameters(mapQry.parameters(), mapQry.parameterIndexes()));
                 }
 
                 IgniteProductVersion minNodeVer = cctx.shared().exchange().minimumNodeVersion(topVer);
@@ -608,6 +609,12 @@ public class GridReduceQueryExecutor {
                 if (oldStyle && distributedJoins)
                     throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
 
+                // Always enforce join order on map side to have consistent behavior.
+                int flags = GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER;
+
+                if (distributedJoins)
+                    flags |= GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS;
+
                 if (send(nodes,
                     oldStyle ?
                         new GridQueryRequest(qryReqId,
@@ -626,7 +633,7 @@ public class GridReduceQueryExecutor {
                             .tables(distributedJoins ? qry.tables() : null)
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
-                            .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0)
+                            .flags(flags)
                             .timeout(timeoutMillis),
                     oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
                     distributedJoins)
@@ -835,14 +842,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param idx Table index.
-     * @return Table name.
-     */
-    private static String table(int idx) {
-        return GridSqlQuerySplitter.table(idx).getSQL();
-    }
-
-    /**
      * Gets or creates new fake table for index.
      *
      * @param c Connection.
@@ -860,7 +859,7 @@ public class GridReduceQueryExecutor {
             try {
                 if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
                     try (Statement stmt = c.createStatement()) {
-                        stmt.executeUpdate("CREATE TABLE " + table(idx) +
+                        stmt.executeUpdate("CREATE TABLE " + mergeTableIdentifier(idx) +
                             "(fake BOOL) ENGINE \"" + GridThreadLocalTable.Engine.class.getName() + '"');
                     }
                     catch (SQLException e) {
@@ -1117,7 +1116,8 @@ public class GridReduceQueryExecutor {
         List<List<?>> lists = new ArrayList<>();
 
         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
-            ResultSet rs = h2.executeSqlQueryWithTimer(space, c, "SELECT PLAN FROM " + table(i), null, false, 0, null);
+            ResultSet rs = h2.executeSqlQueryWithTimer(space, c,
+                "SELECT PLAN FROM " + mergeTableIdentifier(i), null, false, 0, null);
 
             lists.add(F.asList(getPlan(rs)));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 884173f..e5dbf33 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -50,6 +50,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      */
     public static int FLAG_DISTRIBUTED_JOINS = 1;
 
+    /**
+     * Remote map query executor will enforce join order for the received map queries.
+     */
+    public static int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
+
     /** */
     private long reqId;
 
@@ -209,6 +214,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      * @return {@code this}.
      */
     public GridH2QueryRequest flags(int flags) {
+        assert flags >= 0 && flags <= 255: flags;
+
         this.flags = (byte)flags;
 
         return this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
index d5f02eb..cd38d31 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
@@ -368,7 +368,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
         if (cacheMode() == PARTITIONED) {
             assertEquals(2, res.size());
 
-            assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.table(0).getSQL()));
+            assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.mergeTableIdentifier(0)));
         }
         else
             assertEquals(1, res.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 06afe7c..432ed34 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -33,8 +33,8 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -79,6 +79,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    @Override
+    protected long getTestTimeout() {
+        return 100_000_000;
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGridsMultiThreaded(3, false);
@@ -271,9 +276,9 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
             String select = "select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=1" +
                 " union select o.name n1, p.name n2 from Person2 p, Organization o where p.orgId = o._key and o._key=2";
 
-            String plan = (String)c.query(new SqlFieldsQuery("explain " + select)
+            String plan = c.query(new SqlFieldsQuery("explain " + select)
                 .setDistributedJoins(true).setEnforceJoinOrder(true))
-                .getAll().get(0).get(0);
+                .getAll().toString();
 
             X.println("Plan : " + plan);
 
@@ -285,9 +290,9 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
             select = "select * from (" + select + ")";
 
-            plan = (String)c.query(new SqlFieldsQuery("explain " + select)
+            plan = c.query(new SqlFieldsQuery("explain " + select)
                 .setDistributedJoins(true).setEnforceJoinOrder(true))
-                .getAll().get(0).get(0);
+                .getAll().toString();
 
             X.println("Plan : " + plan);
 
@@ -548,15 +553,15 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                 "\"orgRepl\".Organization o",
                 "where p.affKey = o._key", true);
 
-            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
-                "(select * from \"persPart\".Person2) p",
-                "\"orgPart\".Organization o",
-                "where p._key = o._key", false);
-
-            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
-                "\"persPart\".Person2 p",
-                "(select * from \"orgPart\".Organization) o",
-                "where p._key = o._key", false);
+            // TODO Now we can not analyze subqueries to decide if we are collocated or not.
+//            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+//                "(select * from \"persPart\".Person2) p",
+//                "\"orgPart\".Organization o",
+//                "where p._key = o._key", false);
+//            checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
+//                "\"persPart\".Person2 p",
+//                "(select * from \"orgPart\".Organization) o",
+//                "where p._key = o._key", false);
 
             // Join multiple.
 
@@ -576,26 +581,32 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                     sql);
 
                 sql = "select o.k1, p1._key k2, p2._key k3 from " +
-                    "(select o1._key k1, o2._key k2 from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > o2._key) o, " +
-                    "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 where p1._key=p2._key and p2.orgId = o.k1";
+                    "(select o1._key k1, o2._key k2 " +
+                    "from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 " +
+                    "where o1._key > o2._key) o, " +
+                    "\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2._key and p2.orgId = o.k1";
 
                 checkQueryPlan(persPart,
                     false,
-                    1,
+                    0,
                     sql,
-                    "persPartAff", "persPart", "batched:unicast", "orgRepl");
+                    "persPartAff", "persPart", "orgRepl");
 
                 checkQueryFails(persPart, sql, true);
 
-                sql = "select o.k1, p._key k2 from " +
-                    "(select o1._key k1, p1._key k2 from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 where o1._key = p1.orgId) o, " +
-                    "\"persPartAff\".Person2 p where p._key=o.k1";
+                sql = "select o.ok, p._key from " +
+                    "(select o1._key ok, p1._key pk " +
+                    "from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 " +
+                    "where o1._key = p1.orgId) o, " +
+                    "\"persPartAff\".Person2 p where p._key=o.ok";
 
                 checkQueryPlan(persPart,
                     false,
                     1,
                     sql,
-                    "FROM \"persPart\"", "INNER JOIN \"orgRepl\"", "INNER JOIN \"persPartAff\"", "batched:broadcast");
+                    "FROM \"persPart\"", "INNER JOIN \"orgRepl\"",
+                    "INNER JOIN \"persPartAff\"", "batched:unicast");
 
                 checkQueryFails(persPart, sql, true);
             }
@@ -657,7 +668,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
             {
                 String sql = "select p1._key k1, p2._key k2, o._key k3 " +
                     "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " +
-                    "where p1._key=p2._key and p2.orgId = o._key";
+                    "where p1._key=p2.name and p2.orgId = o._key";
 
                 checkQueryPlan(persPart,
                     false,
@@ -666,21 +677,39 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                     "batched:unicast");
 
                 sql = "select p1._key k1, p2._key k2, o._key k3 " +
-                    "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " +
                     "where p1._key=p2._key and p2.orgId = o._key";
 
                 checkQueryPlan(persPart,
                     false,
+                    0,
+                    sql);
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2.name and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
                     1,
                     sql,
                     "batched:unicast");
 
                 sql = "select p1._key k1, p2._key k2, o._key k3 " +
-                    "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " +
+                    "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
                     "where p1._key=p2._key and p2.orgId = o._key";
 
                 checkQueryPlan(persPart,
                     false,
+                    0,
+                    sql);
+
+                sql = "select p1._key k1, p2._key k2, o._key k3 " +
+                    "from (select * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
+                    "where p1._key=p2.name and p2.orgId = o._key";
+
+                checkQueryPlan(persPart,
+                    false,
                     1,
                     sql,
                     "batched:unicast");
@@ -691,9 +720,8 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
                 checkQueryPlan(persPart,
                     false,
-                    1,
-                    sql,
-                    "batched:unicast");
+                    0,
+                    sql);
             }
         }
         finally {
@@ -823,17 +851,28 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
         boolean enforceJoinOrder,
         int expBatchedJoins,
         String sql,
-        String...expText) {
+        String...expText
+    ) {
+        checkQueryPlan(cache,
+            enforceJoinOrder,
+            expBatchedJoins,
+            new SqlFieldsQuery(sql),
+            expText);
+
+        sql = "select * from (" + sql + ")";
+
         checkQueryPlan(cache,
             enforceJoinOrder,
             expBatchedJoins,
             new SqlFieldsQuery(sql),
             expText);
 
+        sql = "select * from (" + sql + ")";
+
         checkQueryPlan(cache,
             enforceJoinOrder,
             expBatchedJoins,
-            new SqlFieldsQuery("select * from (" + sql + ")"),
+            new SqlFieldsQuery(sql),
             expText);
     }
 
@@ -854,7 +893,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
 
         String plan = queryPlan(cache, qry);
 
-        log.info("Plan: " + plan);
+        log.info("\n  Plan:\n" + plan);
 
         assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']',
             expBatchedJoins,

http://git-wip-us.apache.org/repos/asf/ignite/blob/3737407b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index b909b36..d3ff902 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -53,6 +53,12 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
+    private static final String TEST_SCHEMA = "SCH";
+
+    /** */
+    private static final String TEST_CACHE = "my-cache";
+
+    /** */
     private static Ignite ignite;
 
     /** {@inheritDoc} */
@@ -69,12 +75,14 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
         // Cache.
         CacheConfiguration cc = defaultCacheConfiguration();
 
+        cc.setName(TEST_CACHE);
         cc.setCacheMode(CacheMode.PARTITIONED);
         cc.setAtomicityMode(CacheAtomicityMode.ATOMIC);
         cc.setNearConfiguration(null);
         cc.setWriteSynchronizationMode(FULL_SYNC);
         cc.setRebalanceMode(SYNC);
         cc.setSwapEnabled(false);
+        cc.setSqlSchema(TEST_SCHEMA);
         cc.setSqlFunctionClasses(GridQueryParsingTest.class);
         cc.setIndexedTypes(
             String.class, Address.class,
@@ -163,7 +171,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
         checkQuery("select avg(old) from Person, Address where Person.addrId = Address.id " +
             "and lower(Address.street) = lower(?)");
 
-        checkQuery("select name, date from Person");
+        checkQuery("select name, name, date, date d from Person");
         checkQuery("select distinct name, date from Person");
         checkQuery("select * from Person p");
         checkQuery("select * from Person");
@@ -240,16 +248,16 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
         checkQuery("select street from Person p, (select a.street from Address a where a.street is not null) ");
         checkQuery("select addr.street from Person p, (select a.street from Address a where a.street is not null) addr");
 
-        checkQuery("select p.name n from \"\".Person p order by p.old + 10");
+        checkQuery("select p.name n from sch.Person p order by p.old + 10");
 
-        checkQuery("select case when p.name is null then 'Vasya' end x from \"\".Person p");
-        checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from \"\".Person p");
-        checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from \"\".Person p");
+        checkQuery("select case when p.name is null then 'Vasya' end x from sch.Person p");
+        checkQuery("select case when p.name like 'V%' then 'Vasya' else 'Other' end x from sch.Person p");
+        checkQuery("select case when upper(p.name) = 'VASYA' then 'Vasya' when p.name is not null then p.name else 'Other' end x from sch.Person p");
 
-        checkQuery("select case p.name when 'Vasya' then 1 end z from \"\".Person p");
-        checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from \"\".Person p");
-        checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from \"\".Person p");
-        checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from \"\".Person p");
+        checkQuery("select case p.name when 'Vasya' then 1 end z from sch.Person p");
+        checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 end z from sch.Person p");
+        checkQuery("select case p.name when 'Vasya' then 1 when 'Petya' then 2 else 3 end z from sch.Person p");
+        checkQuery("select case p.name when 'Vasya' then 1 else 3 end z from sch.Person p");
 
         checkQuery("select count(*) as a from Person union select count(*) as a from Address");
         checkQuery("select old, count(*) as a from Person group by old union select 1, count(*) as a from Address");
@@ -262,6 +270,17 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
         checkQuery("(select name from Person limit 4) UNION (select street from Address limit 1) limit ? offset ?");
         checkQuery("(select 2 a) union all (select 1) order by 1");
         checkQuery("(select 2 a) union all (select 1) order by a desc nulls first limit ? offset ?");
+
+        checkQuery("select public.\"#\".\"@\" from (select 1 as \"@\") \"#\"");
+//        checkQuery("select sch.\"#\".\"@\" from (select 1 as \"@\") \"#\""); // Illegal query.
+        checkQuery("select \"#\".\"@\" from (select 1 as \"@\") \"#\"");
+        checkQuery("select \"@\" from (select 1 as \"@\") \"#\"");
+        checkQuery("select sch.\"#\".old from sch.Person \"#\"");
+        checkQuery("select sch.\"#\".old from Person \"#\"");
+        checkQuery("select \"#\".old from Person \"#\"");
+        checkQuery("select old from Person \"#\"");
+//        checkQuery("select Person.old from Person \"#\""); // Illegal query.
+        checkQuery("select sch.\"#\".* from Person \"#\"");
     }
 
     /** */
@@ -374,7 +393,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
 
         IgniteH2Indexing idx = U.field(qryProcessor, "idx");
 
-        return (JdbcConnection)idx.connectionForSpace(null);
+        return (JdbcConnection)idx.connectionForSpace(TEST_CACHE);
     }
 
     /**
@@ -416,7 +435,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
     private void checkQuery(String qry) throws Exception {
         Prepared prepared = parse(qry);
 
-        GridSqlStatement gQry = new GridSqlQueryParser().parse(prepared);
+        GridSqlStatement gQry = new GridSqlQueryParser(false).parse(prepared);
 
         String res = gQry.getSQL();
 


Mime
View raw message