ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/4] ignite git commit: ignite-3478 Mvcc support for sql indexes
Date Wed, 25 Oct 2017 12:16:05 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
index 85dcf50..8954de0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
@@ -17,20 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.h2.database.io;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
 
 /**
  * Leaf page for H2 row references.
  */
-public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
+public class H2LeafIO extends AbstractH2LeafIO {
     /** */
     public static final IOVersions<H2LeafIO> VERSIONS = new IOVersions<>(
         new H2LeafIO(1)
@@ -39,36 +31,7 @@ public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
     /**
      * @param ver Page format version.
      */
-    protected H2LeafIO(int ver) {
+    private H2LeafIO(int ver) {
         super(T_H2_REF_LEAF, ver, 8);
     }
-
-    /** {@inheritDoc} */
-    @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
-        GridH2Row row0 = (GridH2Row)row;
-
-        assert row0.link() != 0;
-
-        PageUtils.putLong(pageAddr, off, row0.link());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
-        assert srcIo == this;
-
-        PageUtils.putLong(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx));
-    }
-
-    /** {@inheritDoc} */
-    @Override public SearchRow getLookupRow(BPlusTree<SearchRow,?> tree, long pageAddr, int idx)
-        throws IgniteCheckedException {
-        long link = getLink(pageAddr, idx);
-
-        return ((H2Tree)tree).getRowFactory().getRow(link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLink(long pageAddr, int idx) {
-        return PageUtils.getLong(pageAddr, offset(idx));
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
new file mode 100644
index 0000000..fa6978e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ *
+ */
+class H2MvccExtrasInnerIO extends AbstractH2ExtrasInnerIO {
+    /** */
+    private final int crdVerOff;
+
+    /** */
+    private final int cntrOff;
+
+    /** */
+    private final int newCrdVerOff;
+
+    /** */
+    private final int newCntrOff;
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param payloadSize Payload size.
+     */
+    H2MvccExtrasInnerIO(short type, int ver, int payloadSize) {
+        super(type, ver, 40, payloadSize);
+
+        crdVerOff = payloadSize + 8;
+        cntrOff = payloadSize + 16;
+        newCrdVerOff = payloadSize + 24;
+        newCntrOff = payloadSize + 32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
new file mode 100644
index 0000000..2448e76
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ * Leaf page for H2 row references.
+ */
+class H2MvccExtrasLeafIO extends AbstractH2ExtrasLeafIO {
+    /** */
+    private final int crdVerOff;
+
+    /** */
+    private final int cntrOff;
+
+    /** */
+    private final int newCrdVerOff;
+
+    /** */
+    private final int newCntrOff;
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param payloadSize Payload size.
+     */
+    H2MvccExtrasLeafIO(short type, int ver, int payloadSize) {
+        super(type, ver, 40, payloadSize);
+
+        crdVerOff = payloadSize + 8;
+        cntrOff = payloadSize + 16;
+        newCrdVerOff = payloadSize + 24;
+        newCntrOff = payloadSize + 32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
new file mode 100644
index 0000000..e64ab43
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ * Inner page for H2 row references.
+ */
+public class H2MvccInnerIO extends AbstractH2InnerIO {
+    /** */
+    public static final IOVersions<H2MvccInnerIO> VERSIONS = new IOVersions<>(
+        new H2MvccInnerIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    private H2MvccInnerIO(int ver) {
+        super(T_H2_MVCC_REF_INNER, ver, 40);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
new file mode 100644
index 0000000..a364432
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ *
+ */
+public class H2MvccLeafIO extends AbstractH2LeafIO {
+    /** */
+    public static final IOVersions<H2MvccLeafIO> VERSIONS = new IOVersions<>(
+        new H2MvccLeafIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    private H2MvccLeafIO(int ver) {
+        super(T_H2_MVCC_REF_LEAF, ver, 40);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
index ce69197..d828c44 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
@@ -27,4 +27,37 @@ public interface H2RowLinkIO {
      * @return Row link.
      */
     public long getLink(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc coordinator version.
+     */
+    public long getMvccCoordinatorVersion(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc counter.
+     */
+    public long getMvccCounter(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc coordinator version.
+     */
+    public long getNewMvccCoordinatorVersion(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc counter.
+     */
+    public long getNewMvccCounter(long pageAddr, int idx);
+
+    /**
+     * @return {@code True} if IO stores mvcc information.
+     */
+    public boolean storeMvccInfo();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 92b7d10..96b331a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -200,6 +201,12 @@ public abstract class GridH2IndexBase extends BaseIndex {
     public abstract GridH2Row put(GridH2Row row);
 
     /**
+     * @param row Row.
+     * @return {@code True} if replaced existing row.
+     */
+    public abstract boolean putx(GridH2Row row);
+
+    /**
      * Remove row from index.
      *
      * @param row Row.
@@ -426,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     // This is the first request containing all the search rows.
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter());
+                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter());
                 }
                 else {
                     // This is request to fetch next portion of data.
@@ -1469,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex {
         /** */
         final IndexingQueryFilter filter;
 
+        /** */
+        private final H2TreeMvccFilterClosure mvccFilter;
+
         /** Iterator. */
         Iterator<GridH2Row> iter = emptyIterator();
 
         /**
          * @param bounds Bounds.
+         * @param segment Segment.
          * @param filter Filter.
+         * @param mvccFilter Mvcc filter.
          */
         RangeSource(
             Iterable<GridH2RowRangeBounds> bounds,
             int segment,
-            IndexingQueryFilter filter
+            IndexingQueryFilter filter,
+            H2TreeMvccFilterClosure mvccFilter
         ) {
             this.segment = segment;
             this.filter = filter;
+            this.mvccFilter = mvccFilter;
+
             boundsIter = bounds.iterator();
         }
 
@@ -1540,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                 IgniteTree t = treeForRead(segment);
 
-                iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
+                iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter));
 
                 if (!iter.hasNext()) {
                     // We have to return empty range here.
@@ -1565,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param t Tree.
      * @param first Lower bound.
-     * @param includeFirst Whether lower bound should be inclusive.
      * @param last Upper bound always inclusive.
      * @param filter Filter.
+     * @param mvccFilter Mvcc filter.
      * @return Iterator over rows in given range.
      */
     protected GridCursor<GridH2Row> doFind0(
         IgniteTree t,
         @Nullable SearchRow first,
-        boolean includeFirst,
         @Nullable SearchRow last,
-        IndexingQueryFilter filter) {
+        IndexingQueryFilter filter,
+        H2TreeMvccFilterClosure mvccFilter) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
index e855536..62b459a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -56,17 +58,24 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
     /** */
     private Value ver;
 
+    /** */
+    private final MvccCoordinatorVersion newVer;
+
     /**
      * Constructor.
      *
      * @param desc Row descriptor.
      * @param row Row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @param keyType Key type.
      * @param valType Value type.
      * @throws IgniteCheckedException If failed.
      */
-    public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, CacheDataRow row, int keyType, int valType)
-        throws IgniteCheckedException {
+    public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc,
+        CacheDataRow row,
+        MvccCoordinatorVersion newVer,
+        int keyType,
+        int valType) throws IgniteCheckedException {
         super(row);
 
         this.desc = desc;
@@ -78,6 +87,23 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
 
         if (row.version() != null)
             this.ver = desc.wrap(row.version(), Value.JAVA_OBJECT);
+
+        this.newVer = newVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long newMvccCoordinatorVersion() {
+        return newVer != null ? newVer.coordinatorVersion() : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long newMvccCounter() {
+        return newVer != null ? newVer.counter(): CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean indexSearchRow() {
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
index 5e09a86..38ad9d0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
@@ -284,6 +284,11 @@ public class GridH2MetaTable extends TableBase {
                     throw new IllegalStateException("Index: " + idx);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return false; // TODO IGNITE-3478, check meta table with mvcc.
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
index fd8a613..d24dc08 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
@@ -70,7 +70,7 @@ public class GridH2PlainRowFactory extends RowFactory {
         /**
          * @param key Key.
          */
-        public RowKey(Value key) {
+        RowKey(Value key) {
             this.key = key;
         }
 
@@ -92,6 +92,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowKey.class, this);
         }
@@ -138,6 +143,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowPair.class, this);
         }
@@ -174,6 +184,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowSimple.class, this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 91f0aef..b490179 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
     /** */
     private GridH2CollocationModel qryCollocationMdl;
 
+    /** */
+    private H2TreeMvccFilterClosure mvccFilter;
+
     /**
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
@@ -102,13 +106,34 @@ public class GridH2QueryContext {
      * @param segmentId Index segment ID.
      * @param type Query type.
      */
-    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+    public GridH2QueryContext(UUID locNodeId,
+        UUID nodeId,
+        long qryId,
+        int segmentId,
+        GridH2QueryType type) {
         assert segmentId == 0 || type == MAP;
 
         key = new Key(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
+     * @return Mvcc version.
+     */
+    @Nullable public H2TreeMvccFilterClosure mvccFilter() {
+        return mvccFilter;
+    }
+
+    /**
+     * @param mvccFilter Mvcc filter.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext mvccFilter(H2TreeMvccFilterClosure mvccFilter) {
+        this.mvccFilter = mvccFilter;
+
+        return this;
+    }
+
+    /**
      * @return Type.
      */
     public GridH2QueryType type() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 54e0417..785b791 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
@@ -88,16 +89,35 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD
 
     /** {@inheritDoc} */
     @Override public long mvccCoordinatorVersion() {
-        throw new UnsupportedOperationException();
+        return row.mvccCoordinatorVersion();
     }
 
     /** {@inheritDoc} */
     @Override public long mvccCounter() {
-        throw new UnsupportedOperationException();
+        return row.mvccCounter();
     }
 
     /** {@inheritDoc} */
     @Override public boolean removed() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean indexSearchRow() {
+        return false;
+    }
+
+    /**
+     * @return Part of new mvcc version.
+     */
+    public long newMvccCoordinatorVersion() {
+        return 0;
+    }
+
+    /**
+     * @return Part of new mvcc version.
+     */
+    public long newMvccCounter() {
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 1d915e5..ad91deb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -58,6 +59,7 @@ import org.h2.value.ValueString;
 import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
 import org.h2.value.ValueUuid;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
@@ -273,17 +275,21 @@ public class GridH2RowDescriptor {
      * Creates new row.
      *
      * @param dataRow Data row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @return Row.
      * @throws IgniteCheckedException If failed.
      */
-    public GridH2Row createRow(CacheDataRow dataRow) throws IgniteCheckedException {
+    public GridH2Row createRow(CacheDataRow dataRow, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException {
         GridH2Row row;
 
         try {
-            if (dataRow.value() == null) // Only can happen for remove operation, can create simple search row.
+            if (dataRow.value() == null) { // Only can happen for remove operation, can create simple search row.
+                assert newVer == null;
+
                 row = new GridH2KeyRowOnheap(dataRow, wrap(dataRow.key(), keyType));
+            }
             else
-                row = new GridH2KeyValueRowOnheap(this, dataRow, keyType, valType);
+                row = new GridH2KeyValueRowOnheap(this, dataRow, newVer, keyType, valType);
         }
         catch (ClassCastException e) {
             throw new IgniteCheckedException("Failed to convert key to SQL type. " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
new file mode 100644
index 0000000..4b3940c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.h2.result.Row;
+
+/**
+ *
+ */
+public interface GridH2SearchRow extends Row {
+    /**
+     * @return Mvcc coordinator version.
+     */
+    public long mvccCoordinatorVersion();
+
+    /**
+     * @return Mvcc counter.
+     */
+    public long mvccCounter();
+
+    /**
+     * @return {@code True} for rows used for index search (as opposed to rows stored in {@link H2Tree}.
+     */
+    public boolean indexSearchRow();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
index 24a90b3..4fc8ee5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.store.Data;
@@ -25,7 +26,7 @@ import org.h2.value.Value;
 /**
  * Dummy H2 search row adadpter.
  */
-public abstract class GridH2SearchRowAdapter implements Row {
+public abstract class GridH2SearchRowAdapter implements GridH2SearchRow {
     /** {@inheritDoc} */
     @Override public void setKeyAndVersion(SearchRow old) {
         throw new UnsupportedOperationException();
@@ -100,4 +101,14 @@ public abstract class GridH2SearchRowAdapter implements Row {
     @Override public Value[] getValueList() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 6c353e9..ca9c1f5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -399,15 +401,16 @@ public class GridH2Table extends TableBase {
      * otherwise value and expiration time will be updated or new row will be added.
      *
      * @param row Row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @param rmv If {@code true} then remove, else update row.
      * @return {@code true} If operation succeeded.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean update(CacheDataRow row, boolean rmv)
+    public boolean update(CacheDataRow row, @Nullable MvccCoordinatorVersion newVer, boolean rmv)
         throws IgniteCheckedException {
         assert desc != null;
 
-        GridH2Row h2Row = desc.createRow(row);
+        GridH2Row h2Row = desc.createRow(row, newVer);
 
         if (rmv)
             return doUpdate(h2Row, true);
@@ -454,6 +457,8 @@ public class GridH2Table extends TableBase {
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
     private boolean doUpdate(final GridH2Row row, boolean del) throws IgniteCheckedException {
+        assert !cctx.mvccEnabled() || row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA : row;
+
         // Here we assume that each key can't be updated concurrently and case when different indexes
         // getting updated from different threads with different rows with the same key is impossible.
         lock(false);
@@ -466,10 +471,25 @@ public class GridH2Table extends TableBase {
             if (!del) {
                 assert rowFactory == null || row.link() != 0 : row;
 
-                GridH2Row old = pk.put(row); // Put to PK.
+                GridH2Row old;
+
+                // Put to PK.
+                if (cctx.mvccEnabled()) {
+                    boolean replaced = pk.putx(row);
+
+                    assert replaced == (row.newMvccCoordinatorVersion() != 0);
 
-                if (old == null)
-                    size.increment();
+                    old = null;
+
+                    if (!replaced)
+                        size.increment();
+                }
+                else {
+                    old = pk.put(row);
+
+                    if (old == null)
+                        size.increment();
+                }
 
                 int len = idxs.size();
 
@@ -536,17 +556,24 @@ public class GridH2Table extends TableBase {
     private void addToIndex(GridH2IndexBase idx, Index pk, GridH2Row row, GridH2Row old, boolean tmp) {
         assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx;
 
-        GridH2Row old2 = idx.put(row);
+        if (idx.ctx.mvccEnabled()) {
+            boolean replaced = idx.putx(row);
 
-        if (old2 != null) { // Row was replaced in index.
-            if (!tmp) {
-                if (!eq(pk, old2, old))
-                    throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
-                        "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+            assert replaced == (row.newMvccCoordinatorVersion() != 0);
+        }
+        else {
+            GridH2Row old2 = idx.put(row);
+
+            if (old2 != null) { // Row was replaced in index.
+                if (!tmp) {
+                    if (!eq(pk, old2, old))
+                        throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
+                                "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+                }
             }
+            else if (old != null) // Row was not replaced, need to remove manually.
+                idx.removex(old);
         }
-        else if (old != null) // Row was not replaced, need to remove manually.
-            idx.removex(old);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 77b928f..fe21b1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -482,7 +484,8 @@ public class GridMapQueryExecutor {
                     false, // Replicated is always false here (see condition above).
                     req.timeout(),
                     params,
-                    true); // Lazy = true.
+                    true,
+                    req.mvccVersion()); // Lazy = true.
             }
             else {
                 ctx.closure().callLocal(
@@ -504,7 +507,8 @@ public class GridMapQueryExecutor {
                                 false,
                                 req.timeout(),
                                 params,
-                                false); // Lazy = false.
+                                false,
+                                req.mvccVersion()); // Lazy = false.
 
                             return null;
                         }
@@ -528,7 +532,8 @@ public class GridMapQueryExecutor {
             replicated,
             req.timeout(),
             params,
-            lazy);
+            lazy,
+            req.mvccVersion());
     }
 
     /**
@@ -544,6 +549,7 @@ public class GridMapQueryExecutor {
      * @param pageSize Page size.
      * @param distributedJoinMode Query distributed join mode.
      * @param lazy Streaming flag.
+     * @param mvccVer Mvcc version.
      */
     private void onQueryRequest0(
         final ClusterNode node,
@@ -561,7 +567,8 @@ public class GridMapQueryExecutor {
         final boolean replicated,
         final int timeout,
         final Object[] params,
-        boolean lazy
+        boolean lazy,
+        @Nullable final MvccCoordinatorVersion mvccVer
     ) {
         if (lazy && MapQueryLazyWorker.currentWorker() == null) {
             // Lazy queries must be re-submitted to dedicated workers.
@@ -570,8 +577,24 @@ public class GridMapQueryExecutor {
 
             worker.submit(new Runnable() {
                 @Override public void run() {
-                    onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts,
-                        pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true);
+                    onQueryRequest0(
+                        node,
+                        reqId,
+                        segmentId,
+                        schemaName,
+                        qrys,
+                        cacheIds,
+                        topVer,
+                        partsMap,
+                        parts,
+                        pageSize,
+                        distributedJoinMode,
+                        enforceJoinOrder,
+                        replicated,
+                        timeout,
+                        params,
+                        true,
+                        mvccVer);
                 }
             });
 
@@ -639,6 +662,9 @@ public class GridMapQueryExecutor {
                 .topologyVersion(topVer)
                 .reservations(reserved);
 
+            if (mvccVer != null)
+                qctx.mvccFilter(new H2TreeMvccFilterClosure(mvccVer));
+
             Connection conn = h2.connectionForSchema(schemaName);
 
             H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
index 1c0efb3..4518d14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -25,8 +25,10 @@ import java.util.NoSuchElementException;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.h2.index.Cursor;
 import org.h2.result.Row;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
@@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
     /** Whether remote resources were released. */
     private boolean released;
 
+    /** */
+    private MvccQueryTracker mvccTracker;
+
     /**
      * Constructor.
      *
@@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
      * @param distributedJoins Distributed joins.
      * @throws IgniteCheckedException if failed.
      */
-    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
-        long qryReqId, boolean distributedJoins)
+    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+        Collection<ClusterNode> nodes,
+        ReduceQueryRun run,
+        long qryReqId,
+        boolean distributedJoins,
+        @Nullable MvccQueryTracker mvccTracker)
         throws IgniteCheckedException {
         this.rdcExec = rdcExec;
         this.nodes = nodes;
         this.run = run;
         this.qryReqId = qryReqId;
         this.distributedJoins = distributedJoins;
+        this.mvccTracker = mvccTracker;
 
         this.idxIter = run.indexes().iterator();
 
@@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
     private void releaseIfNeeded() {
         if (!released) {
             try {
-                rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+                rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins, mvccTracker);
             }
             finally {
                 released = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..80b1970 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -83,11 +85,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
@@ -562,6 +566,31 @@ public class GridReduceQueryExecutor {
 
             List<Integer> cacheIds = qry.cacheIds();
 
+            MvccQueryTracker mvccTracker = null;
+
+            // TODO IGNITE-3478.
+            if (qry.mvccEnabled()) {
+                assert !cacheIds.isEmpty();
+
+                final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+                mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
+                    new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() {
+                    @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+                        fut.onDone(null, e);
+                    }
+                });
+
+                mvccTracker.requestVersion(topVer);
+
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException(e);
+                }
+            }
+
             Collection<ClusterNode> nodes = null;
 
             // Explicit partition mapping for unstable topology.
@@ -730,6 +759,9 @@ public class GridReduceQueryExecutor {
                     .timeout(timeoutMillis)
                     .schemaName(schemaName);
 
+                if (mvccTracker != null)
+                    req.mvccVersion(mvccTracker.mvccVersion());
+
                 if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) {
                     awaitAllReplies(r, nodes, cancel);
 
@@ -763,7 +795,12 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl) {
-                        resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
+                        resIter = new GridMergeIndexIterator(this,
+                            finalNodes,
+                            r,
+                            qryReqId,
+                            qry.distributedJoins(),
+                            mvccTracker);
 
                         release = false;
                     }
@@ -833,7 +870,7 @@ public class GridReduceQueryExecutor {
             }
             finally {
                 if (release) {
-                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
+                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
 
                     if (!skipMergeTbl) {
                         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
@@ -1028,7 +1065,10 @@ public class GridReduceQueryExecutor {
      * @param distributedJoins Distributed join flag.
      */
     public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
-        boolean distributedJoins) {
+        boolean distributedJoins, MvccQueryTracker mvccTracker) {
+        if (mvccTracker != null)
+            mvccTracker.onQueryDone();
+
         // For distributedJoins need always send cancel request to cleanup resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 4e1fadb..347b88c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
@@ -42,6 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /** Schema name. */
     private String schemaName;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
         params = req.params;
         paramsBytes = req.paramsBytes;
         schemaName = req.schemaName;
+        mvccVer = req.mvccVer;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
+     * @param mvccVer Mvcc version.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) {
+        this.mvccVer = mvccVer;
+
+        return this;
     }
 
     /**
@@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeInt("pageSize", pageSize))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                if (!writer.writeInt("pageSize", pageSize))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("reqId", reqId))
+                if (!writer.writeIntArray("qryParts", qryParts))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeInt("timeout", timeout))
+                if (!writer.writeLong("reqId", reqId))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeString("schemaName", schemaName))
                     return false;
 
                 writer.incrementState();
 
-
             case 10:
-                if (!writer.writeIntArray("qryParts", qryParts))
+                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeString("schemaName", schemaName))
+                if (!writer.writeInt("timeout", timeout))
                     return false;
 
                 writer.incrementState();
+
+            case 12:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 2:
-                pageSize = reader.readInt("pageSize");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 3:
-                paramsBytes = reader.readByteArray("paramsBytes");
+                pageSize = reader.readInt("pageSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 4:
-                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+                paramsBytes = reader.readByteArray("paramsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 5:
-                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 6:
-                reqId = reader.readLong("reqId");
+                qryParts = reader.readIntArray("qryParts");
 
                 if (!reader.isLastRead())
                     return false;
@@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 7:
-                tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
+                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 8:
-                timeout = reader.readInt("timeout");
+                reqId = reader.readLong("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 9:
-                topVer = reader.readMessage("topVer");
+                schemaName = reader.readString("schemaName");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
 
-
             case 10:
-                qryParts = reader.readIntArray("qryParts");
+                tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 11:
-                schemaName = reader.readString("schemaName");
+                timeout = reader.readInt("timeout");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
+            case 12:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */


Mime
View raw message