ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/11] ignite git commit: ignite-1232 Distributed SQL joins implementation
Date Fri, 22 Jul 2016 14:08:47 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
new file mode 100644
index 0000000..d8f2e08
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.h2.command.ddl.CreateTableData;
+import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
+import org.h2.index.Cursor;
+import org.h2.index.Index;
+import org.h2.index.IndexCondition;
+import org.h2.index.IndexType;
+import org.h2.index.SingleRowCursor;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.result.SortOrder;
+import org.h2.table.Column;
+import org.h2.table.IndexColumn;
+import org.h2.table.TableBase;
+import org.h2.table.TableFilter;
+import org.h2.value.Value;
+import org.h2.value.ValueInt;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Meta table.
+ */
+public class GridH2MetaTable extends TableBase {
+    /** */
+    private static final int ID = 0;
+
+    /** */
+    private final MetaIndex index;
+
+    /** */
+    private final AtomicLong dataModificationId = new AtomicLong();
+
+    /** */
+    private final Set<Session> fakeExclusiveSet = Collections.newSetFromMap(
+        new ConcurrentHashMap8<Session,Boolean>());
+
+    /**
+     * @param data Data.
+     */
+    public GridH2MetaTable(CreateTableData data) {
+        super(data);
+
+        ArrayList<Column> cols = data.columns;
+        assert cols.size() == 4 : cols;
+
+        Column id = cols.get(ID);
+        assert "ID".equals(id.getName()) && id.getType() == Value.INT : cols;
+        assert id.getColumnId() == ID;
+
+        index = new MetaIndex();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row getTemplateRow() {
+        return new MetaRow();
+    }
+
+    /** {@inheritDoc} */
+    @Override public SearchRow getTemplateSimpleRow(boolean singleColumn) {
+        if (singleColumn)
+            return GridH2RowFactory.create((Value)null);
+
+        return new MetaRow();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lock(Session session, boolean exclusive, boolean forceLockEvenInMvcc) {
+        if (fakeExclusiveSet.contains(session))
+            return true;
+
+        if (exclusive)
+            fakeExclusiveSet.add(session);
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unlock(Session s) {
+        fakeExclusiveSet.remove(s);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(Session session) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public Index addIndex(Session session, String indexName, int indexId,
+        IndexColumn[] cols, IndexType indexType, boolean create, String indexComment) {
+        assert cols.length == 1 : "len: " + cols.length;
+
+        int colId = cols[0].column.getColumnId();
+        assert colId == ID : "colId: " + colId;
+
+        return index;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeRow(Session session, Row row) {
+        dataModificationId.incrementAndGet();
+        index.remove(session, row);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void truncate(Session session) {
+        dataModificationId.incrementAndGet();
+        index.truncate(session);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addRow(Session session, Row row) {
+        dataModificationId.incrementAndGet();
+        index.add(session, row);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkSupportAlter() {
+        throw DbException.getUnsupportedException("alter");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTableType() {
+        return SYSTEM_TABLE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Index getScanIndex(Session session) {
+        return index;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Index getUniqueIndex() {
+        return index;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ArrayList<Index> getIndexes() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLockedExclusively() {
+        return !fakeExclusiveSet.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLockedExclusivelyBy(Session s) {
+        return fakeExclusiveSet.contains(s);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaxDataModificationId() {
+        return dataModificationId.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDeterministic() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canGetRowCount() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean canDrop() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCount(Session session) {
+        return index.getRowCount(session);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getRowCountApproximation() {
+        return index.getRowCountApproximation();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDiskSpaceUsed() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkRename() {
+        throw DbException.getUnsupportedException("rename");
+    }
+
+    /**
+     * Get value row.
+     */
+    private static class MetaRow extends GridH2Row {
+        /** */
+        private Value v0;
+
+        /** */
+        private Value v1;
+
+        /** */
+        private Value v2;
+
+        /** */
+        private Value v3;
+
+        /** {@inheritDoc} */
+        @Override public int getColumnCount() {
+            return 4;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Value getValue(int idx) {
+            switch (idx) {
+                case 0:
+                    return v0;
+
+                case 1:
+                    return v1;
+
+                case 2:
+                    return v2;
+
+                case 3:
+                    return v3;
+
+                default:
+                    throw new IllegalStateException("Index: " + idx);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setValue(int idx, Value v) {
+            switch (idx) {
+                case 0:
+                    v0 = v;
+
+                    break;
+
+                case 1:
+                    v1 = v;
+
+                    break;
+
+                case 2:
+                    v2 = v;
+
+                    break;
+
+                case 3:
+                    v3 = v;
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Index: " + idx);
+            }
+        }
+    }
+
+    /**
+     * Met index.
+     */
+    private static class MetaIndex extends BaseIndex {
+        /** */
+        private final ConcurrentMap<ValueInt, GridH2Row> rows = new ConcurrentHashMap8<>();
+
+        /** {@inheritDoc} */
+        @Override public void checkRename() {
+            throw DbException.getUnsupportedException("rename");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close(Session session) {
+            // No-op.
+        }
+
+        /**
+         * @param row Row.
+         * @return ID.
+         */
+        private static ValueInt id(SearchRow row) {
+            Value id = row.getValue(ID);
+
+            assert id != null;
+
+            return (ValueInt)id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Session session, Row row) {
+            rows.put(id(row), (GridH2Row)row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(Session session, Row row) {
+            rows.remove(id(row), row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Cursor find(Session session, SearchRow first, SearchRow last) {
+            if (first == null || last == null || !Objects.equals(id(first), id(last)))
+                return new GridH2Cursor(rows.values().iterator());
+
+            return new SingleRowCursor(rows.get(id(first)));
+        }
+
+        /** {@inheritDoc} */
+        @Override public double getCost(Session session, int[] masks, TableFilter[] filters,
+            int filter, SortOrder sortOrder) {
+            if ((masks[ID] & IndexCondition.EQUALITY) == IndexCondition.EQUALITY)
+                return 1;
+
+            return 1000 + rows.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove(Session session) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void truncate(Session session) {
+            rows.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean canGetFirstOrLast() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Cursor findFirstOrLast(Session session, boolean first) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean needRebuild() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getRowCount(Session session) {
+            return rows.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getRowCountApproximation() {
+            return getRowCount(null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getDiskSpaceUsed() {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
new file mode 100644
index 0000000..19ea2b2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+
+/**
+ * Thread local SQL query context which is intended to be accessible from everywhere.
+ */
+public class GridH2QueryContext {
+    /** */
+    private static final ThreadLocal<GridH2QueryContext> qctx = new ThreadLocal<>();
+
+    /** */
+    private static final ConcurrentMap<Key, GridH2QueryContext> qctxs = new ConcurrentHashMap8<>();
+
+    /** */
+    private final Key key;
+
+    /** */
+    private volatile boolean cleared;
+
+    /** Index snapshots. */
+    @GridToStringInclude
+    private Map<Long, Object> snapshots;
+
+    /** */
+    private List<GridReservable> reservations;
+
+    /** Range streams for indexes. */
+    private Map<Integer, Object> streams;
+
+    /** Range sources for indexes. */
+    private Map<SourceKey, Object> sources;
+
+    /** */
+    private int batchLookupIdGen;
+
+    /** */
+    private IndexingQueryFilter filter;
+
+    /** */
+    private AffinityTopologyVersion topVer;
+
+    /** */
+    private Map<UUID, int[]> partsMap;
+
+    /** */
+    private UUID[] partsNodes;
+
+    /** */
+    private boolean distributedJoins;
+
+    /** */
+    private int pageSize;
+
+    /** */
+    private GridH2CollocationModel qryCollocationMdl;
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param type Query type.
+     */
+    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+        key = new Key(locNodeId, nodeId, qryId, type);
+    }
+
+    /**
+     * @return Type.
+     */
+    public GridH2QueryType type() {
+        return key.type;
+    }
+
+    /**
+     * @return Origin node ID.
+     */
+    public UUID originNodeId() {
+        return key.nodeId;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryId() {
+        return key.qryId;
+    }
+
+    /**
+     * @return Query collocation model.
+     */
+    public GridH2CollocationModel queryCollocationModel() {
+        return qryCollocationMdl;
+    }
+
+    /**
+     * @param qryCollocationMdl Query collocation model.
+     */
+    public void queryCollocationModel(GridH2CollocationModel qryCollocationMdl) {
+        this.qryCollocationMdl = qryCollocationMdl;
+    }
+
+    /**
+     * @param distributedJoins Distributed joins can be run in this query.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext distributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+
+        return this;
+    }
+
+    /**
+     * @return {@code true} If distributed joins can be run in this query.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @param reservations Reserved partitions or group reservations.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext reservations(List<GridReservable> reservations) {
+        this.reservations = reservations;
+
+        return this;
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext topologyVersion(AffinityTopologyVersion topVer) {
+        this.topVer = topVer;
+
+        return this;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @param partsMap Partitions map.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext partitionsMap(Map<UUID,int[]> partsMap) {
+        this.partsMap = partsMap;
+
+        return this;
+    }
+
+    /**
+     * @return Partitions map.
+     */
+    public Map<UUID,int[]> partitionsMap() {
+        return partsMap;
+    }
+
+    /**
+     * @param p Partition.
+     * @param cctx Cache context.
+     * @return Owning node ID.
+     */
+    public UUID nodeForPartition(int p, GridCacheContext<?, ?> cctx) {
+        UUID[] nodeIds = partsNodes;
+
+        if (nodeIds == null) {
+            assert partsMap != null;
+
+            nodeIds = new UUID[cctx.affinity().partitions()];
+
+            for (Map.Entry<UUID, int[]> e : partsMap.entrySet()) {
+                UUID nodeId = e.getKey();
+                int[] nodeParts = e.getValue();
+
+                assert nodeId != null;
+                assert !F.isEmpty(nodeParts);
+
+                for (int part : nodeParts) {
+                    assert nodeIds[part] == null;
+
+                    nodeIds[part] = nodeId;
+                }
+            }
+
+            partsNodes = nodeIds;
+        }
+
+        return nodeIds[p];
+    }
+
+    /**
+     * @param idxId Index ID.
+     * @param snapshot Index snapshot.
+     */
+    public void putSnapshot(long idxId, Object snapshot) {
+        assert snapshot != null;
+        assert get() == null : "need to snapshot indexes before setting query context for correct visibility";
+
+        if (snapshot instanceof GridReservable && !((GridReservable)snapshot).reserve())
+            throw new IllegalStateException("Must be already reserved before.");
+
+        if (snapshots == null)
+            snapshots = new HashMap<>();
+
+        if (snapshots.put(idxId, snapshot) != null)
+            throw new IllegalStateException("Index already snapshoted.");
+    }
+
+    /**
+     * Clear taken snapshots.
+     */
+    public void clearSnapshots() {
+        if (F.isEmpty(snapshots))
+            return;
+
+        for (Object snapshot : snapshots.values()) {
+            if (snapshot instanceof GridReservable)
+                ((GridReservable)snapshot).release();
+        }
+
+        snapshots = null;
+    }
+
+    /**
+     * @param idxId Index ID.
+     * @return Index snapshot or {@code null} if none.
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T getSnapshot(long idxId) {
+        if (snapshots == null)
+            return null;
+
+        return (T)snapshots.get(idxId);
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     * @param streams Range streams.
+     */
+    public synchronized void putStreams(int batchLookupId, Object streams) {
+        if (this.streams == null) {
+            if (streams == null)
+                return;
+
+            this.streams = new HashMap<>();
+        }
+
+        if (streams == null)
+            this.streams.remove(batchLookupId);
+        else
+            this.streams.put(batchLookupId, streams);
+    }
+
+    /**
+     * @param batchLookupId Batch lookup ID.
+     * @return Range streams.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized <T> T getStreams(int batchLookupId) {
+        if (streams == null)
+            return null;
+
+        return (T)streams.get(batchLookupId);
+    }
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param batchLookupId Batch lookup ID.
+     * @param src Range source.
+     */
+    public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) {
+        SourceKey srcKey = new SourceKey(ownerId, batchLookupId);
+
+        if (src != null) {
+            if (sources == null)
+                sources = new HashMap<>();
+
+            sources.put(srcKey, src);
+        }
+        else if (sources != null)
+            sources.remove(srcKey);
+    }
+
+    /**
+     * @param ownerId Owner node ID.
+     * @param batchLookupId Batch lookup ID.
+     * @return Range source.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized <T> T getSource(UUID ownerId, int batchLookupId) {
+        if (sources == null)
+            return null;
+
+        return (T)sources.get(new SourceKey(ownerId, batchLookupId));
+    }
+
+    /**
+     * @return Next batch ID.
+     */
+    public int nextBatchLookupId() {
+        return ++batchLookupIdGen;
+    }
+
+    /**
+     * @return If indexes were snapshotted before query execution.
+     */
+    public boolean hasIndexSnapshots() {
+        return snapshots != null;
+    }
+
+    /**
+     * Sets current thread local context. This method must be called when all the non-volatile properties are
+     * already set to ensure visibility for other threads.
+     *
+     * @param x Query context.
+     */
+     public static void set(GridH2QueryContext x) {
+         assert qctx.get() == null;
+
+         // We need MAP query context to be available to other threads to run distributed joins.
+         if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
+             throw new IllegalStateException("Query context is already set.");
+
+         qctx.set(x);
+    }
+
+    /**
+     * Drops current thread local context.
+     */
+    public static void clearThreadLocal() {
+        GridH2QueryContext x = qctx.get();
+
+        assert x != null;
+
+        qctx.remove();
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param type Query type.
+     * @return {@code True} if context was found.
+     */
+    public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+        return doClear(new Key(locNodeId, nodeId, qryId, type), false);
+    }
+
+    /**
+     * @param key Context key.
+     * @param nodeStop Node is stopping.
+     * @return {@code True} if context was found.
+     */
+    private static boolean doClear(Key key, boolean nodeStop) {
+        assert key.type == MAP : key.type;
+
+        GridH2QueryContext x = qctxs.remove(key);
+
+        if (x == null)
+            return false;
+
+        assert x.key.equals(key);
+
+        x.clearContext(nodeStop);
+
+        return true;
+    }
+
+    /**
+     * @param nodeStop Node is stopping.
+     */
+    public void clearContext(boolean nodeStop) {
+        cleared = true;
+
+        clearSnapshots();
+
+        List<GridReservable> r = reservations;
+
+        if (!nodeStop && !F.isEmpty(r)) {
+            for (int i = 0; i < r.size(); i++)
+                r.get(i).release();
+        }
+    }
+
+    /**
+     * @return {@code true} If the context is cleared.
+     */
+    public boolean isCleared() {
+        return cleared;
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId Dead node ID.
+     */
+    public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) {
+        for (Key key : qctxs.keySet()) {
+            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId))
+                doClear(key, false);
+        }
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     */
+    public static void clearLocalNodeStop(UUID locNodeId) {
+        for (Key key : qctxs.keySet()) {
+            if (key.locNodeId.equals(locNodeId))
+                doClear(key, true);
+        }
+    }
+
+    /**
+     * Access current thread local query context (if it was set).
+     *
+     * @return Current thread local query context or {@code null} if the query runs outside of Ignite context.
+     */
+    @Nullable public static GridH2QueryContext get() {
+        return qctx.get();
+    }
+
+    /**
+     * Access query context from another thread.
+     *
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param type Query type.
+     * @return Query context.
+     */
+    @Nullable public static GridH2QueryContext get(
+        UUID locNodeId,
+        UUID nodeId,
+        long qryId,
+        GridH2QueryType type
+    ) {
+        return qctxs.get(new Key(locNodeId, nodeId, qryId, type));
+    }
+
+    /**
+     * @return Filter.
+     */
+    public IndexingQueryFilter filter() {
+        return filter;
+    }
+
+    /**
+     * @param filter Filter.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext filter(IndexingQueryFilter filter) {
+        this.filter = filter;
+
+        return this;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @param pageSize Page size.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext pageSize(int pageSize) {
+        this.pageSize = pageSize;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridH2QueryContext.class, this);
+    }
+
+    /**
+     * Unique key for the query context.
+     */
+    private static class Key {
+        /** */
+        private final UUID locNodeId;
+
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final long qryId;
+
+        /** */
+        private final GridH2QueryType type;
+
+        /**
+         * @param locNodeId Local node ID.
+         * @param nodeId The node who initiated the query.
+         * @param qryId The query ID.
+         * @param type Query type.
+         */
+        private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+            assert locNodeId != null;
+            assert nodeId != null;
+            assert type != null;
+
+            this.locNodeId = locNodeId;
+            this.nodeId = nodeId;
+            this.qryId = qryId;
+            this.type = type;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Key key = (Key)o;
+
+            return qryId == key.qryId && nodeId.equals(key.nodeId) && type == key.type &&
+               locNodeId.equals(key.locNodeId) ;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = locNodeId.hashCode();
+
+            res = 31 * res + nodeId.hashCode();
+            res = 31 * res + (int)(qryId ^ (qryId >>> 32));
+            res = 31 * res + type.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Key.class, this);
+        }
+    }
+
+    /**
+     * Key for source.
+     */
+    private static final class SourceKey {
+        /** */
+        UUID ownerId;
+
+        /** */
+        int batchLookupId;
+
+        /**
+         * @param ownerId Owner node ID.
+         * @param batchLookupId Batch lookup ID.
+         */
+        SourceKey(UUID ownerId, int batchLookupId) {
+            this.ownerId = ownerId;
+            this.batchLookupId = batchLookupId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            SourceKey srcKey = (SourceKey)o;
+
+            return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return 31 * ownerId.hashCode() + batchLookupId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
new file mode 100644
index 0000000..f6d0408
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryType.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+/**
+ * Query type.
+ */
+public enum GridH2QueryType {
+    /**
+     * Map query. Runs over local partitions, possibly with distributed joins.
+     */
+    MAP,
+
+    /**
+     * Reduce query. Local query on a node which initiated the original query.
+     */
+    REDUCE,
+
+    /**
+     * Local query. It may be also a query over replicated cache but all the data is available locally.
+     */
+    LOCAL,
+
+    /**
+     * Replicated query over a network. Such a query can be sent from a client node or node which
+     * did not load all the partitions yet.
+     */
+    REPLICATED,
+
+    /**
+     * Parsing and optimization stage.
+     */
+    PREPARE,
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java
new file mode 100644
index 0000000..e333d71
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RetryException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.IgniteException;
+
+/**
+ * Internal exception.
+ */
+public class GridH2RetryException extends IgniteException {
+    /**
+     * @param msg Message.
+     */
+    public GridH2RetryException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 3b6b56e..9486a2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -18,19 +18,14 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.h2.result.Row;
+import org.h2.result.SearchRow;
+import org.h2.store.Data;
 import org.h2.value.Value;
 
 /**
  * Row with locking support needed for unique key conflicts resolution.
  */
-public class GridH2Row extends Row implements GridSearchRowPointer {
-    /**
-     * @param data Column values.
-     */
-    public GridH2Row(Value... data) {
-        super(data, MEMORY_CALCULATE);
-    }
-
+public abstract class GridH2Row extends Row implements GridSearchRowPointer {
     /** {@inheritDoc} */
     @Override public long pointer() {
         throw new IllegalStateException();
@@ -45,4 +40,79 @@ public class GridH2Row extends Row implements GridSearchRowPointer {
     @Override public void decrementRefCount() {
         throw new IllegalStateException();
     }
+
+    /** {@inheritDoc} */
+    @Override public Row getCopy() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setVersion(int version) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getByteCount(Data dummy) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEmpty() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDeleted(boolean deleted) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSessionId(int sessionId) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getSessionId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDeleted() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setKeyAndVersion(SearchRow old) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getVersion() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setKey(long key) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getKey() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMemory() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Value[] getValueList() {
+        throw new UnsupportedOperationException();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 80dcfcb..f519c30 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSmartPointerFactory;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
@@ -31,9 +34,30 @@ import org.jetbrains.annotations.Nullable;
  */
 public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<GridH2KeyValueRowOffheap> {
     /**
-     * @return Owner.
+     * Gets indexing.
+     *
+     * @return indexing.
+     */
+    public IgniteH2Indexing indexing();
+
+    /**
+     * Gets type descriptor.
+     *
+     * @return Type descriptor.
+     */
+    public GridQueryTypeDescriptor type();
+
+    /**
+     * Gets cache context for this row descriptor.
+     *
+     * @return Cache context.
+     */
+    public GridCacheContext<?, ?> context();
+
+    /**
+     * @return Cache configuration.
      */
-    public IgniteH2Indexing owner();
+    public CacheConfiguration configuration();
 
     /**
      * Creates new row.

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
new file mode 100644
index 0000000..00ff3f2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.h2.result.RowFactory;
+import org.h2.value.Value;
+
+/**
+ * Row factory.
+ */
+public class GridH2RowFactory extends RowFactory {
+    /**
+     * @param v Value.
+     * @return Row.
+     */
+    public static GridH2Row create(Value v) {
+        return new RowKey(v);
+    }
+
+    /**
+     * @param v1 Value 1.
+     * @param v2 Value 2.
+     * @return Row.
+     */
+    public static GridH2Row create(Value v1, Value v2) {
+        return new RowPair(v1, v2);
+    }
+
+    /**
+     * @param data Values.
+     * @return Row.
+     */
+    public static GridH2Row create(Value... data) {
+        switch (data.length) {
+            case 0:
+                throw new IllegalStateException("Zero columns row.");
+
+            case 1:
+                return new RowKey(data[0]);
+
+            case 2:
+                return new RowPair(data[0], data[1]);
+
+            default:
+                return new RowSimple(data);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridH2Row createRow(Value[] data, int memory) {
+        return create(data);
+    }
+
+    /**
+     * Single value row.
+     */
+    private static final class RowKey extends GridH2Row {
+        /** */
+        private Value key;
+
+        /**
+         * @param key Key.
+         */
+        public RowKey(Value key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getColumnCount() {
+            return 1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Value getValue(int idx) {
+            assert idx == 0 : idx;
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setValue(int idx, Value v) {
+            assert idx == 0 : idx;
+            key = v;
+        }
+    }
+
+    /**
+     * Row of two values.
+     */
+    private static final class RowPair extends GridH2Row  {
+        /** */
+        private Value v1;
+
+        /** */
+        private Value v2;
+
+        /**
+         * @param v1 First value.
+         * @param v2 Second value.
+         */
+        private RowPair(Value v1, Value v2) {
+            this.v1 = v1;
+            this.v2 = v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getColumnCount() {
+            return 2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Value getValue(int idx) {
+            return idx == 0 ? v1 : v2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setValue(int idx, Value v) {
+            if (idx == 0)
+                v1 = v;
+            else {
+                assert idx == 1 : idx;
+
+                v2 = v;
+            }
+        }
+    }
+
+    /**
+     * Simple array based row.
+     */
+    private static final class RowSimple extends GridH2Row {
+        /** */
+        @GridToStringInclude
+        private Value[] vals;
+
+        /**
+         * @param vals Values.
+         */
+        private RowSimple(Value[] vals) {
+            this.vals = vals;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getColumnCount() {
+            return vals.length;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Value getValue(int idx) {
+            return vals[idx];
+        }
+
+        /** {@inheritDoc} */
+        @Override public void setValue(int idx, Value v) {
+            vals[idx] = v;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RowSimple.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index bea4dd8..8d080ae 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -23,22 +23,24 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Set;
-import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.h2.api.TableEngine;
 import org.h2.command.ddl.CreateTableData;
-import org.h2.engine.Constants;
 import org.h2.engine.Database;
 import org.h2.engine.DbObject;
 import org.h2.engine.Session;
+import org.h2.index.BaseIndex;
 import org.h2.index.Cursor;
 import org.h2.index.Index;
+import org.h2.index.IndexLookupBatch;
 import org.h2.index.IndexType;
 import org.h2.message.DbException;
 import org.h2.result.Row;
@@ -55,7 +57,10 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.LongAdder8;
 
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
  * H2 Table implementation.
@@ -74,10 +79,16 @@ public class GridH2Table extends TableBase {
     private final ReadWriteLock lock;
 
     /** */
-    private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap8<Session, Boolean>());
+    private boolean destroyed;
 
     /** */
-    private volatile Object[] actualSnapshot;
+    private final Set<Session> sessions = Collections.newSetFromMap(new ConcurrentHashMap8<Session,Boolean>());
+
+    /** */
+    private final AtomicReference<Object[]> actualSnapshot = new AtomicReference<>();
+
+    /** */
+    private IndexColumn affKeyCol;
 
     /** */
     private final LongAdder8 size = new LongAdder8();
@@ -102,17 +113,57 @@ public class GridH2Table extends TableBase {
         this.desc = desc;
         this.spaceName = spaceName;
 
+        if (desc != null && desc.context() != null && !desc.context().customAffinityMapper()) {
+            boolean affinityColExists = true;
+
+            String affKey = desc.type().affinityKey();
+
+            int affKeyColId = -1;
+
+            if (affKey != null) {
+                String colName = desc.context().config().isSqlEscapeAll() ? affKey : affKey.toUpperCase();
+
+                if (doesColumnExist(colName))
+                    affKeyColId = getColumn(colName).getColumnId();
+                else
+                    affinityColExists = false;
+            }
+            else
+                affKeyColId = KEY_COL;
+
+            if (affinityColExists) {
+                affKeyCol = indexColumn(affKeyColId, SortOrder.ASCENDING);
+
+                assert affKeyCol != null;
+            }
+        }
+
+        // Indexes must be created in the end when everything is ready.
         idxs = idxsFactory.createIndexes(this);
 
         assert idxs != null;
         assert idxs.size() >= 1;
 
-        lock = new ReentrantReadWriteLock();
-
         // Add scan index at 0 which is required by H2.
         idxs.add(0, new ScanIndex(index(0)));
 
         snapshotEnabled = desc == null || desc.snapshotableIndex();
+
+        lock = snapshotEnabled ? new ReentrantReadWriteLock() : null;
+    }
+
+    /**
+     * @return {@code true} If this is a partitioned table.
+     */
+    public boolean isPartitioned() {
+        return desc != null && desc.configuration().getCacheMode() == PARTITIONED;
+    }
+
+    /**
+     * @return Affinity key column or {@code null} if not available.
+     */
+    @Nullable public IndexColumn getAffinityKeyColumn() {
+        return affKeyCol;
     }
 
     /** {@inheritDoc} */
@@ -172,7 +223,7 @@ public class GridH2Table extends TableBase {
 
         GridUnsafeMemory mem = desc.memory();
 
-        readLock();
+        Lock l = lock(false, Long.MAX_VALUE);
 
         if (mem != null)
             desc.guard().begin();
@@ -191,7 +242,7 @@ public class GridH2Table extends TableBase {
             return true;
         }
         finally {
-            readUnlock();
+            unlock(l);
 
             if (mem != null)
                 desc.guard().end();
@@ -207,118 +258,219 @@ public class GridH2Table extends TableBase {
 
     /** {@inheritDoc} */
     @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "SynchronizationOnLocalVariableOrMethodParameter", "unchecked"})
-    @Override public void lock(@Nullable final Session ses, boolean exclusive, boolean force) {
+    @Override public boolean lock(@Nullable final Session ses, boolean exclusive, boolean force) {
         if (ses != null) {
             if (!sessions.add(ses))
-                return;
+                return false;
 
-            synchronized (ses) {
-                ses.addLock(this);
-            }
+            ses.addLock(this);
         }
 
+        if (snapshotInLock())
+            snapshotIndexes(null);
+
+        return false;
+    }
+
+    /**
+     * @return {@code True} If we must snapshot and release index snapshots in {@link #lock(Session, boolean, boolean)}
+     * and {@link #unlock(Session)} methods.
+     */
+    private boolean snapshotInLock() {
+        if (!snapshotEnabled)
+            return false;
+
+        GridH2QueryContext qctx = GridH2QueryContext.get();
+
+        // On MAP queries with distributed joins we lock tables before the queries.
+        return qctx == null || qctx.type() != MAP || !qctx.hasIndexSnapshots();
+    }
+
+    /**
+     * @param qctx Query context.
+     */
+    public void snapshotIndexes(GridH2QueryContext qctx) {
         if (!snapshotEnabled)
             return;
 
-        Object[] snapshot;
+        Object[] snapshots;
 
-        for (long waitTime = 100;; waitTime *= 2) { // Increase wait time to avoid starvation.
-            snapshot = actualSnapshot;
+        Lock l;
 
-            if (snapshot != null) {
-                // Reuse existing snapshot without locking.
-                for (int i = 1, len = idxs.size(); i < len; i++)
-                    index(i).takeSnapshot(snapshot[i - 1]);
+        // Try to reuse existing snapshots outside of the lock.
+        for (long waitTime = 200;; waitTime *= 2) { // Increase wait time to avoid starvation.
+            snapshots = actualSnapshot.get();
 
-                return;
-            }
+            if (snapshots != null) { // Reuse existing snapshot without locking.
+                snapshots = doSnapshotIndexes(snapshots, qctx);
 
-            try {
-                if (lock.writeLock().tryLock(waitTime, TimeUnit.MILLISECONDS))
-                    break;
-            }
-            catch (InterruptedException e) {
-                throw new IgniteException("Thread got interrupted while trying to acquire index lock.", e);
+                if (snapshots != null)
+                    return; // Reused successfully.
             }
-        }
 
-        boolean snapshoted = false;
+            l = lock(true, waitTime);
+
+            if (l != null)
+                break;
+        }
 
         try {
-            snapshot = actualSnapshot; // Try again inside of the lock.
+            // Try again inside of the lock.
+            snapshots = actualSnapshot.get();
+
+            if (snapshots != null) // Try reusing.
+                snapshots = doSnapshotIndexes(snapshots, qctx);
 
-            if (snapshot == null) {
-                snapshot = takeIndexesSnapshot();
+            if (snapshots == null) { // Reuse failed, produce new snapshots.
+                snapshots = doSnapshotIndexes(null, qctx);
 
-                if (desc == null || desc.memory() == null) // This optimization is disabled for off-heap index.
-                    actualSnapshot = snapshot;
+                assert snapshots != null;
 
-                snapshoted = true;
+                actualSnapshot.set(snapshots);
             }
         }
         finally {
-            lock.writeLock().unlock();
+            unlock(l);
         }
+    }
 
-        if (!snapshoted) {
-            for (int i = 1, len = idxs.size(); i < len; i++)
-                index(i).takeSnapshot(snapshot[i - 1]);
+    /**
+     * @return Table identifier.
+     */
+    public String identifier() {
+        return getSchema().getName() + '.' + getName();
+    }
+
+    /**
+     * @param l Lock.
+     */
+    private static void unlock(Lock l) {
+        if (l != null)
+            l.unlock();
+    }
+
+    /**
+     * @param exclusive Exclusive lock.
+     * @param waitMillis Milliseconds to wait for the lock.
+     * @return The acquired lock or {@code null} if the lock time out occurred.
+     */
+    public Lock lock(boolean exclusive, long waitMillis) {
+        if (!snapshotEnabled)
+            return null;
+
+        Lock l = exclusive ? lock.writeLock() : lock.readLock();
+
+        try {
+            if (!l.tryLock(waitMillis, TimeUnit.MILLISECONDS))
+                return null;
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException("Thread got interrupted while trying to acquire table lock.", e);
         }
+
+        if (destroyed) {
+            unlock(l);
+
+            throw new IllegalStateException("Table " + identifier() + " already destroyed.");
+        }
+
+        return l;
     }
 
     /**
      * Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have
      * the same contents at snapshot taking time.
      *
+     * @param qctx Query context.
      * @return New indexes data snapshot.
      */
     @SuppressWarnings("unchecked")
-    private Object[] takeIndexesSnapshot() {
-        int len = idxs.size();
+    private Object[] doSnapshotIndexes(Object[] snapshots, GridH2QueryContext qctx) {
+        assert snapshotEnabled;
 
-        Object[] snapshot = new ConcurrentNavigableMap[len - 1];
+        if (snapshots == null) // Nothing to reuse, create new snapshots.
+            snapshots = new Object[idxs.size() - 1];
 
-        for (int i = 1; i < len; i++) { // Take snapshots on all except first which is scan.
-            Object s = index(i).takeSnapshot(null);
+        // Take snapshots on all except first which is scan.
+        for (int i = 1, len = idxs.size(); i < len; i++) {
+            Object s = snapshots[i - 1];
 
-            snapshot[i - 1] = s;
-        }
+            boolean reuseExisting = s != null;
 
-        return snapshot;
-    }
+            s = index(i).takeSnapshot(s, qctx);
 
-    /** {@inheritDoc} */
-    @Override public void close(Session ses) {
-        assert !sessions.contains(ses);
-    }
+            if (reuseExisting && s == null) { // Existing snapshot was invalidated before we were able to reserve it.
+                // Release already taken snapshots.
+                if (qctx != null)
+                    qctx.clearSnapshots();
 
-    /** {@inheritDoc} */
-    @Override public void unlock(@Nullable Session ses) {
-        if (ses != null) {
-            boolean res = sessions.remove(ses);
+                for (int j = 1; j < i; j++)
+                    index(j).releaseSnapshot();
+
+                // Drop invalidated snapshot.
+                actualSnapshot.compareAndSet(snapshots, null);
+
+                return null;
+            }
 
-            assert res;
+            snapshots[i - 1] = s;
         }
 
-        for (int i = 1, len = idxs.size(); i < len; i++)  // Release snapshots on all except first which is scan.
-            index(i).releaseSnapshot();
+        return snapshots;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(Session ses) {
+        // No-op.
     }
 
     /**
-     * Closes table and releases resources.
+     * Destroy the table.
      */
-    public void close() {
-        writeLock();
+    public void destroy() {
+        Lock l = lock(true, Long.MAX_VALUE);
 
         try {
+            assert sessions.isEmpty() : sessions;
+
+            destroyed = true;
+
             for (int i = 1, len = idxs.size(); i < len; i++)
-                index(i).close(null);
+                index(i).destroy();
         }
         finally {
-            writeUnlock();
+            unlock(l);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void unlock(@Nullable Session ses) {
+        if (ses != null && !sessions.remove(ses))
+            return;
+
+        if (snapshotInLock())
+            releaseSnapshots();
+    }
+
+    /**
+     * Releases snapshots.
+     */
+    public void releaseSnapshots() {
+        if (!snapshotEnabled)
+            return;
+
+        releaseSnapshots0(idxs);
+    }
+
+    /**
+     * @param idxs Indexes.
+     */
+    private void releaseSnapshots0(ArrayList<Index> idxs) {
+        // Release snapshots on all except first which is scan.
+        for (int i = 1, len = idxs.size(); i < len; i++)
+            ((GridH2IndexBase)idxs.get(i)).releaseSnapshot();
+    }
+
     /**
      * Updates table for given key. If value is null then row with given key will be removed from table,
      * otherwise value and expiration time will be updated or new row will be added.
@@ -336,7 +488,16 @@ public class GridH2Table extends TableBase {
 
         GridH2Row row = desc.createRow(key, val, expirationTime);
 
-        return doUpdate(row, rmv);
+        if (!rmv)
+            ((GridH2AbstractKeyValueRow)row).valuesCache(new Value[getColumns().length]);
+
+        try {
+            return doUpdate(row, rmv);
+        }
+        finally {
+            if (!rmv)
+                ((GridH2AbstractKeyValueRow)row).valuesCache(null);
+        }
     }
 
     /**
@@ -372,7 +533,7 @@ public class GridH2Table extends TableBase {
         // getting updated from different threads with different rows with the same key is impossible.
         GridUnsafeMemory mem = desc == null ? null : desc.memory();
 
-        readLock();
+        Lock l = lock(false, Long.MAX_VALUE);
 
         if (mem != null)
             desc.guard().begin();
@@ -400,7 +561,7 @@ public class GridH2Table extends TableBase {
                 while (++i < len) {
                     GridH2IndexBase idx = index(i);
 
-                    assert !idx.getIndexType().isUnique() : "Unique indexes are not supported.";
+                    assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx;
 
                     GridH2Row old2 = idx.put(row);
 
@@ -417,7 +578,7 @@ public class GridH2Table extends TableBase {
                 //  index(1) is PK, get full row from there (search row here contains only key but no other columns).
                 GridH2Row old = pk.remove(row);
 
-                if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
+                if (row.getColumnCount() != 1 && old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
                     Value v = row.getValue(VAL_COL);
 
                     if (v != null)
@@ -440,12 +601,12 @@ public class GridH2Table extends TableBase {
             }
 
             // The snapshot is not actual after update.
-            actualSnapshot = null;
+            actualSnapshot.set(null);
 
             return true;
         }
         finally {
-            readUnlock();
+            unlock(l);
 
             if (mem != null)
                 desc.guard().end();
@@ -485,13 +646,12 @@ public class GridH2Table extends TableBase {
         if (!snapshotEnabled)
             return;
 
-        GridUnsafeMemory memory = desc == null ? null : desc.memory();
+        Lock l = lock(true, Long.MAX_VALUE);
 
-        lock.writeLock().lock();
+        ArrayList<Index> idxs0 = new ArrayList<>(idxs);
 
         try {
-            if (memory == null && actualSnapshot == null)
-                actualSnapshot = takeIndexesSnapshot(); // Allow read access while we are rebuilding indexes.
+            snapshotIndexes(null); // Allow read access while we are rebuilding indexes.
 
             for (int i = 1, len = idxs.size(); i < len; i++) {
                 GridH2IndexBase newIdx = index(i).rebuild();
@@ -502,13 +662,13 @@ public class GridH2Table extends TableBase {
                     idxs.set(0, new ScanIndex(newIdx));
             }
         }
-        catch (InterruptedException ignored) {
-            // No-op.
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
         }
         finally {
-            lock.writeLock().unlock();
+            releaseSnapshots0(idxs0);
 
-            actualSnapshot = null;
+            unlock(l);
         }
     }
 
@@ -621,38 +781,6 @@ public class GridH2Table extends TableBase {
     }
 
     /**
-     *
-     */
-    private void readLock() {
-        if (snapshotEnabled)
-            lock.readLock().lock();
-    }
-
-    /**
-     *
-     */
-    private void readUnlock() {
-        if (snapshotEnabled)
-            lock.readLock().unlock();
-    }
-
-    /**
-     *
-     */
-    private void writeLock() {
-        if (snapshotEnabled)
-            lock.writeLock().lock();
-    }
-
-    /**
-     *
-     */
-    private void writeUnlock() {
-        if (snapshotEnabled)
-            lock.writeLock().unlock();
-    }
-
-    /**
      * H2 Table engine.
      */
     @SuppressWarnings({"PublicInnerClass", "FieldAccessedSynchronizedAndUnsynchronized"})
@@ -729,7 +857,7 @@ public class GridH2Table extends TableBase {
      * Wrapper type for primary key.
      */
     @SuppressWarnings("PackageVisibleInnerClass")
-    static class ScanIndex implements Index {
+    static class ScanIndex extends BaseIndex {
         /** */
         static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_";
 
@@ -774,8 +902,8 @@ public class GridH2Table extends TableBase {
         }
 
         /** {@inheritDoc} */
-        @Override public void close(Session ses) {
-            delegate.close(ses);
+        @Override public final void close(Session ses) {
+            // No-op.
         }
 
         /** {@inheritDoc} */
@@ -819,8 +947,13 @@ public class GridH2Table extends TableBase {
         }
 
         /** {@inheritDoc} */
-        @Override public double getCost(Session ses, int[] masks, TableFilter tblFilter, SortOrder sortOrder) {
-            return getRowCountApproximation() + Constants.COST_ROW_OFFSET;
+        @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter,
+            SortOrder sortOrder) {
+            long rows = getRowCountApproximation();
+            double baseCost = getCostRangeIndex(masks, rows, filters, filter, sortOrder, true);
+            int mul = delegate.getDistributedMultiplier(ses, filters, filter);
+
+            return  mul * baseCost;
         }
 
         /** {@inheritDoc} */
@@ -884,6 +1017,11 @@ public class GridH2Table extends TableBase {
         }
 
         /** {@inheritDoc} */
+        @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
+            return delegate.createLookupBatch(filter);
+        }
+
+        /** {@inheritDoc} */
         @Override public void truncate(Session ses) {
             // No-op.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 2c95b66..33aaf7b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -17,10 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.io.Closeable;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -28,14 +27,14 @@ import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.snaptree.SnapTreeMap;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
-import org.h2.index.SingleRowCursor;
-import org.h2.result.Row;
+import org.h2.message.DbException;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
@@ -49,10 +48,7 @@ import org.jetbrains.annotations.Nullable;
 @SuppressWarnings("ComparatorNotSerializable")
 public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridSearchRowPointer> {
     /** */
-    protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
-
-    /** */
-    private final ThreadLocal<ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>> snapshot = new ThreadLocal<>();
+    private final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
 
     /** */
     private final boolean snapshotEnabled;
@@ -63,19 +59,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * @param name Index name.
      * @param tbl Table.
      * @param pk If this index is primary key.
-     * @param keyCol Primary key column index.
-     * @param valCol Value column index.
-     * @param cols Index columns list.
+     * @param colsList Index columns list.
      */
     @SuppressWarnings("unchecked")
-    public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, int keyCol, int valCol, IndexColumn... cols) {
-        super(keyCol, valCol);
-        if (!pk) {
-            // For other indexes we add primary key at the end to avoid conflicts.
-            cols = Arrays.copyOf(cols, cols.length + 1);
-
-            cols[cols.length - 1] = tbl.indexColumn(keyCol, SortOrder.ASCENDING);
-        }
+    public GridH2TreeIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> colsList) {
+        IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);
 
         IndexColumn.mapColumns(cols, tbl);
 
@@ -146,61 +134,25 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                 }
             };
         }
-    }
-
-    /**
-     * Closes index and releases resources.
-     */
-    public void close() {
-        if (tree instanceof Closeable)
-            U.closeQuiet((Closeable)tree);
-    }
-
-    /**
-     * Takes snapshot to be used in current thread. If argument is null it will be taken from current trees.
-     *
-     * @param s Map to be used as snapshot if not null.
-     * @return Taken snapshot or given argument back.
-     */
-    @SuppressWarnings("unchecked")
-    @Override public Object takeSnapshot(@Nullable Object s) {
-        if (!snapshotEnabled)
-            return null;
-
-        assert snapshot.get() == null;
-
-        if (s == null)
-            s = tree instanceof SnapTreeMap ? ((SnapTreeMap)tree).clone() :
-                ((GridOffHeapSnapTreeMap)tree).clone();
-
-        snapshot.set((ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row>)s);
 
-        return s;
+        initDistributedJoinMessaging(tbl);
     }
 
-    /**
-     * Releases snapshot for current thread.
-     */
-    @Override public void releaseSnapshot() {
-        if (!snapshotEnabled)
-            return;
-
-        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> s = snapshot.get();
-
-        snapshot.set(null);
+    /** {@inheritDoc} */
+    @Override protected Object doTakeSnapshot() {
+        assert snapshotEnabled;
 
-        if (s instanceof Closeable)
-            U.closeQuiet((Closeable)s);
+        return tree instanceof SnapTreeMap ?
+            ((SnapTreeMap)tree).clone() :
+            ((GridOffHeapSnapTreeMap)tree).clone();
     }
 
-    /**
-     * @return Snapshot for current thread if there is one.
-     */
-    private ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+    /** {@inheritDoc} */
+    protected final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
         if (!snapshotEnabled)
             return tree;
 
-        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = snapshot.get();
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> res = threadLocalSnapshot();
 
         if (res == null)
             res = tree;
@@ -209,19 +161,21 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     }
 
     /** {@inheritDoc} */
-    @Override public void close(Session ses) {
-        assert snapshot.get() == null;
+    @Override public void destroy() {
+        assert threadLocalSnapshot() == null;
+
+        if (tree instanceof AutoCloseable)
+            U.closeQuiet((AutoCloseable)tree);
 
-        if (tree instanceof Closeable)
-            U.closeQuiet((Closeable)tree);
+        super.destroy();
     }
 
     /** {@inheritDoc} */
     @Override public long getRowCount(@Nullable Session ses) {
-        IndexingQueryFilter f = filters.get();
+        IndexingQueryFilter f = threadLocalFilter();
 
         // Fast path if we don't need to perform any filtering.
-        if (f == null || f.forSpace(((GridH2Table)getTable()).spaceName()) == null)
+        if (f == null || f.forSpace((getTable()).spaceName()) == null)
             return treeForRead().size();
 
         Iterator<GridH2Row> iter = doFind(null, false, null);
@@ -269,13 +223,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     }
 
     /** {@inheritDoc} */
-    @Override public double getCost(Session ses, int[] masks, TableFilter filter, SortOrder sortOrder) {
-        return getCostRangeIndex(masks, getRowCountApproximation(), filter, sortOrder);
+    @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) {
+        long rowCnt = getRowCountApproximation();
+        double baseCost = getCostRangeIndex(masks, rowCnt, filters, filter, sortOrder, false);
+        int mul = getDistributedMultiplier(ses, filters, filter);
+
+        return mul * baseCost;
     }
 
     /** {@inheritDoc} */
     @Override public boolean canFindNext() {
-        return true;
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -309,10 +267,20 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * @return Iterator over rows in given range.
      */
     @SuppressWarnings("unchecked")
-    private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst,
-        @Nullable SearchRow last) {
+    private Iterator<GridH2Row> doFind(@Nullable SearchRow first, boolean includeFirst, @Nullable SearchRow last) {
         ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead();
 
+        return doFind0(t, first, includeFirst, last, threadLocalFilter());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected final Iterator<GridH2Row> doFind0(
+        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t,
+        @Nullable SearchRow first,
+        boolean includeFirst,
+        @Nullable SearchRow last,
+        IndexingQueryFilter filter
+    ) {
         includeFirst &= first != null;
 
         NavigableMap<GridSearchRowPointer, GridH2Row> range = subTree(t, comparable(first, includeFirst ? -1 : 1),
@@ -321,7 +289,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         if (range == null)
             return new GridEmptyIterator<>();
 
-        return filter(range.values().iterator());
+        return filter(range.values().iterator(), filter);
     }
 
     /**
@@ -381,21 +349,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
     /** {@inheritDoc} */
     @Override public boolean canGetFirstOrLast() {
-        return true;
+        return false;
     }
 
     /** {@inheritDoc} */
     @Override public Cursor findFirstOrLast(Session ses, boolean first) {
-        ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree = treeForRead();
-
-        Iterator<GridH2Row> iter = filter(first ? tree.values().iterator() : tree.descendingMap().values().iterator());
-
-        GridSearchRowPointer res = null;
-
-        if (iter.hasNext())
-            res = iter.next();
-
-        return new SingleRowCursor((Row)res);
+        throw DbException.throwInternalError();
     }
 
     /** {@inheritDoc} */
@@ -503,11 +462,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     @Override public GridH2TreeIndex rebuild() throws InterruptedException {
         IndexColumn[] cols = getIndexColumns();
 
-        if (!getIndexType().isUnique())
-            cols = Arrays.copyOf(cols, cols.length - 1);
-
-        GridH2TreeIndex idx = new GridH2TreeIndex(getName(), (GridH2Table)getTable(), getIndexType().isUnique(),
-            keyCol, valCol, cols);
+        GridH2TreeIndex idx = new GridH2TreeIndex(getName(), getTable(),
+            getIndexType().isUnique(), F.asList(cols));
 
         Thread thread = Thread.currentThread();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Utils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Utils.java
deleted file mode 100644
index aafcfcc..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Utils.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.opt;
-
-import java.sql.Timestamp;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import org.h2.value.ValueTimestamp;
-
-/**
- *
- */
-@SuppressWarnings({"JavaAbbreviationUsage", "GridBracket"})
-public class GridH2Utils {
-    /** Copy/pasted from org.h2.util.DateTimeUtils */
-    private static final int SHIFT_YEAR = 9;
-
-    /** Copy/pasted from org.h2.util.DateTimeUtils */
-    private static final int SHIFT_MONTH = 5;
-
-    /** Static calendar. */
-    private static final Calendar staticCalendar = Calendar.getInstance();
-
-    /** */
-    private static final ThreadLocal<Calendar> localCalendar = new ThreadLocal<>();
-
-    /**
-     * @return The instance of calendar for local thread.
-     */
-    public static Calendar getLocalCalendar() {
-        Calendar res = localCalendar.get();
-
-        if (res == null) {
-            res = (Calendar)staticCalendar.clone();
-
-            localCalendar.set(res);
-        }
-
-        return res;
-    }
-
-    /**
-     * Get or create a timestamp value for the given timestamp.
-     *
-     * Copy/pasted from org.h2.value.ValueTimestamp#get(java.sql.Timestamp)
-     *
-     * @param timestamp The timestamp.
-     * @return The value.
-     */
-    public static ValueTimestamp toValueTimestamp(Timestamp timestamp) {
-        long ms = timestamp.getTime();
-        long nanos = timestamp.getNanos() % 1000000;
-
-        Calendar calendar = getLocalCalendar();
-
-        calendar.clear();
-        calendar.setTimeInMillis(ms);
-
-        long dateValue = dateValueFromCalendar(calendar);
-
-        nanos += nanosFromCalendar(calendar);
-
-        return ValueTimestamp.fromDateValueAndNanos(dateValue, nanos);
-    }
-
-    /**
-     * Calculate the nanoseconds since midnight from a given calendar.
-     *
-     * Copy/pasted from org.h2.util.DateTimeUtils#nanosFromCalendar(java.util.Calendar).
-     *
-     * @param cal The calendar.
-     * @return Nanoseconds.
-     */
-    private static long nanosFromCalendar(Calendar cal) {
-        int h = cal.get(Calendar.HOUR_OF_DAY);
-        int m = cal.get(Calendar.MINUTE);
-        int s = cal.get(Calendar.SECOND);
-        int millis = cal.get(Calendar.MILLISECOND);
-
-        return ((((((h * 60L) + m) * 60) + s) * 1000) + millis) * 1000000;
-    }
-
-    /**
-     * Calculate the date value from a given calendar.
-     *
-     * Copy/pasted from org.h2.util.DateTimeUtils#dateValueFromCalendar(java.util.Calendar)
-     *
-     * @param cal The calendar.
-     * @return The date value.
-     */
-    private static long dateValueFromCalendar(Calendar cal) {
-        int year, month, day;
-
-        year = getYear(cal);
-        month = cal.get(Calendar.MONTH) + 1;
-        day = cal.get(Calendar.DAY_OF_MONTH);
-
-        return ((long) year << SHIFT_YEAR) | (month << SHIFT_MONTH) | day;
-    }
-
-    /**
-     * Get the year (positive or negative) from a calendar.
-     *
-     * Copy/pasted from org.h2.util.DateTimeUtils#getYear(java.util.Calendar)
-     *
-     * @param calendar The calendar.
-     * @return The year.
-     */
-    private static int getYear(Calendar calendar) {
-        int year = calendar.get(Calendar.YEAR);
-
-        if (calendar.get(Calendar.ERA) == GregorianCalendar.BC) {
-            year = 1 - year;
-        }
-
-        return year;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
index 29f9675..80e8504 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ValueCacheObject.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.h2.message.DbException;
+import org.h2.util.JdbcUtils;
 import org.h2.util.Utils;
 import org.h2.value.CompareMode;
 import org.h2.value.Value;
@@ -117,7 +118,7 @@ public class GridH2ValueCacheObject extends Value {
         }
 
         // For user-provided and array types.
-        return Utils.serialize(obj, null);
+        return JdbcUtils.serialize(obj, null);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index 957e5f6..716c9cb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
@@ -51,7 +50,7 @@ import org.apache.lucene.search.ScoreDoc;
 import org.apache.lucene.search.TermRangeFilter;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.util.Version;
-import org.h2.util.Utils;
+import org.h2.util.JdbcUtils;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.KEY_FIELD_NAME;
@@ -60,7 +59,7 @@ import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.VA
 /**
  * Lucene fulltext index.
  */
-public class GridLuceneIndex implements Closeable {
+public class GridLuceneIndex implements AutoCloseable {
     /** Field name for string representation of value. */
     public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
 
@@ -363,7 +362,7 @@ public class GridLuceneIndex implements Closeable {
         @SuppressWarnings("unchecked")
         private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException {
             if (coctx == null) // For tests.
-                return (Z)Utils.deserialize(bytes, null);
+                return (Z)JdbcUtils.deserialize(bytes, null);
 
             return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java
index 245b88e..3fb6f3c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlAlias.java
@@ -52,6 +52,18 @@ public class GridSqlAlias extends GridSqlElement {
         this.alias = alias;
     }
 
+    /**
+     * @param el Element.
+     * @return Unwrapped from alias element.
+     */
+    public static GridSqlElement unwrap(GridSqlElement el) {
+        el = el instanceof GridSqlAlias ? el.child() : el;
+
+        assert el != null;
+
+        return el;
+    }
+
     /** {@inheritDoc} */
     @Override public String getSQL() {
         return child().getSQL() + (useAs ? " AS " : " ") + Parser.quoteIdentifier(alias);

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java
index aca1398..6ef4446 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlColumn.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.sql;
 
 import java.util.Collections;
+import org.h2.table.Column;
 
 /**
  * Column.
@@ -32,12 +33,16 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue {
     /** SQL from original query. May be qualified or unqualified column name. */
     private final String sqlText;
 
+    /** */
+    private Column col;
+
     /**
+     * @param col Column.
      * @param from From.
      * @param name Name.
      * @param sqlText Text.
      */
-    public GridSqlColumn(GridSqlElement from, String name, String sqlText) {
+    public GridSqlColumn(Column col, GridSqlElement from, String name, String sqlText) {
         super(Collections.<GridSqlElement>emptyList());
 
         assert sqlText != null;
@@ -45,13 +50,7 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue {
         expressionInFrom = from;
         colName = name;
         this.sqlText = sqlText;
-    }
-
-    /**
-     * @return Simple unqualified column with only name.
-     */
-    public GridSqlColumn simplify() {
-        return new GridSqlColumn(null, colName, colName);
+        this.col = col;
     }
 
     /**
@@ -72,4 +71,11 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue {
     public GridSqlElement expressionInFrom() {
         return expressionInFrom;
     }
+
+    /**
+     * @return H2 Column.
+     */
+    public Column column() {
+        return col;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java
index 57d3c57..d95c14a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlElement.java
@@ -119,4 +119,15 @@ public abstract class GridSqlElement implements Iterable<GridSqlElement> {
     @Override public String toString() {
         return getSQL();
     }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        throw new IllegalStateException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return this == o || (!(o == null || getClass() != o.getClass()) &&
+            children.equals(((GridSqlElement)o).children));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java
index 18d8bdf..737c5b1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlOperation.java
@@ -63,7 +63,7 @@ public class GridSqlOperation extends GridSqlElement {
     /**
      * @return Operation type.
      */
-    public GridSqlOperationType opType() {
+    public GridSqlOperationType operationType() {
         return opType;
     }
 


Mime
View raw message