ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5937 Added BPlusTree.iterate for more optimal mvcc search
Date Mon, 09 Oct 2017 11:42:55 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 921404a6f -> fd53c1a8f


ignite-5937 Added BPlusTree.iterate for more optimal mvcc search


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd53c1a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd53c1a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd53c1a8

Branch: refs/heads/ignite-3478
Commit: fd53c1a8f4e905a7aba469eb5decf38c50b7708e
Parents: 921404a
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Oct 9 14:42:43 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 9 14:42:43 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManager.java        |  11 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 244 +++++++----
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  12 +-
 .../cache/persistence/tree/BPlusTree.java       | 416 ++++++++++++++-----
 .../cache/tree/AbstractDataInnerIO.java         |   8 +-
 .../cache/tree/AbstractDataLeafIO.java          |   8 +-
 .../processors/cache/tree/CacheDataTree.java    |  10 +-
 .../cache/tree/CacheIdAwareDataInnerIO.java     |   4 +-
 .../cache/tree/CacheIdAwareDataLeafIO.java      |   4 +-
 .../processors/cache/tree/DataInnerIO.java      |   4 +-
 .../processors/cache/tree/DataLeafIO.java       |   4 +-
 .../processors/cache/tree/MvccDataInnerIO.java  |   4 +-
 .../processors/cache/tree/MvccDataLeafIO.java   |   4 +-
 .../cache/tree/MvccKeyMaxVersionBound.java      |  77 ++++
 .../cache/tree/MvccKeyMinVersionBound.java      |  49 +++
 .../processors/cache/tree/MvccUpdateRow.java    | 177 ++++++++
 .../cache/tree/MvccVersionBasedSearchRow.java   | 100 +++++
 .../processors/cache/tree/RowLinkIO.java        |  14 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 149 +++++++
 .../processors/database/BPlusTreeSelfTest.java  | 291 ++++++++++++-
 20 files changed, 1376 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9d03e4a..8967ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager {
             long expireTime,
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
 
-        GridLongList mvccUpdate(
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param mvccVer Mvcc version.
+         * @return List of transactions to wait for.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable GridLongList mvccUpdate(
             GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d8c5eaa..25f36b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -55,7 +55,11 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.cache.tree.SearchRow;
@@ -1360,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             try {
                 int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-                MvccDataRow dataRow = new MvccDataRow(
-                    key,
-                    val,
-                    ver,
-                    partId,
-                    cacheId,
-                    mvccVer.coordinatorVersion(),
-                    mvccVer.counter());
-
                 CacheObjectContext coCtx = cctx.cacheObjectContext();
 
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
-                rowStore.addRow(dataRow);
+                if (true) {
+                    MvccUpdateRow updateRow = new MvccUpdateRow(
+                        key,
+                        val,
+                        ver,
+                        mvccVer,
+                        partId,
+                        cacheId);
 
-                assert dataRow.link() != 0 : dataRow;
+                    rowStore.addRow(updateRow);
 
-                if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                    dataRow.cacheId(cctx.cacheId());
+                    assert updateRow.link() != 0 : updateRow;
 
-                boolean old = dataTree.putx(dataRow);
+                    if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                        updateRow.cacheId(cctx.cacheId());
 
-                assert !old;
+                    GridLongList waitTxs = null;
 
-                GridLongList waitTxs = null;
+                    if (mvccVer.initialLoad()) {
+                        boolean old = dataTree.putx(updateRow);
 
-                if (!mvccVer.initialLoad()) {
-                    MvccLongList activeTxs = mvccVer.activeTransactions();
+                        assert !old;
 
-                    // TODO IGNITE-3484: need special method.
-                    GridCursor<CacheDataRow> cur = dataTree.find(
-                        new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
-                        new MvccSearchRow(cacheId, key, 1, 1));
+                        incrementSize(cctx.cacheId());
+                    }
+                    else {
+                        dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                    boolean first = true;
+                        boolean old = dataTree.putx(updateRow);
 
-                    boolean activeTx = false;
+                        assert !old;
 
-                    while (cur.next()) {
-                        CacheDataRow oldVal = cur.get();
+                        if (!updateRow.previousNotNull())
+                            incrementSize(cctx.cacheId());
 
-                        assert oldVal.link() != 0 : oldVal;
+                        waitTxs = updateRow.activeTransactions();
 
-                        if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
-                            activeTxs.contains(oldVal.mvccCounter())) {
-                            if (waitTxs == null)
-                                waitTxs = new GridLongList();
+                        List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
 
-                            assert oldVal.mvccCounter() != mvccVer.counter();
+                        if (cleanupRows != null) {
+                            for (int i = 0; i < cleanupRows.size(); i++) {
+                                CacheSearchRow oldRow = cleanupRows.get(i);
 
-                            waitTxs.add(oldVal.mvccCounter());
+                                assert oldRow.link() != 0L : oldRow;
 
-                            activeTx = true;
+                                boolean rmvd = dataTree.removex(oldRow);
+
+                                assert rmvd;
+
+                                rowStore.removeRow(oldRow.link());
+                            }
                         }
+                    }
 
-                        if (!activeTx) {
-                            // Should not delete oldest version which is less than cleanup version.
-                            int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+                    return waitTxs;
+                }
+                else {
+                    MvccDataRow dataRow = new MvccDataRow(
+                        key,
+                        val,
+                        ver,
+                        partId,
+                        cacheId,
+                        mvccVer.coordinatorVersion(),
+                        mvccVer.counter());
 
-                            if (cmp <= 0) {
-                                if (first)
-                                    first = false;
-                                else {
-                                    boolean rmvd = dataTree.removex(oldVal);
+                    rowStore.addRow(dataRow);
 
-                                    assert rmvd;
+                    assert dataRow.link() != 0 : dataRow;
+
+                    if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                        dataRow.cacheId(cctx.cacheId());
+
+                    boolean old = dataTree.putx(dataRow);
+
+                    assert !old;
+
+                    GridLongList waitTxs = null;
+
+                    if (!mvccVer.initialLoad()) {
+                        MvccLongList activeTxs = mvccVer.activeTransactions();
+
+                        // TODO IGNITE-3484: need special method.
+                        GridCursor<CacheDataRow> cur = dataTree.find(
+                            new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+                            new MvccSearchRow(cacheId, key, 1, 1));
+
+                        boolean first = true;
+
+                        boolean activeTx = false;
+
+                        while (cur.next()) {
+                            CacheDataRow oldVal = cur.get();
+
+                            assert oldVal.link() != 0 : oldVal;
+
+                            if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
+                                activeTxs.contains(oldVal.mvccCounter())) {
+                                if (waitTxs == null)
+                                    waitTxs = new GridLongList();
+
+                                assert oldVal.mvccCounter() != mvccVer.counter();
+
+                                waitTxs.add(oldVal.mvccCounter());
+
+                                activeTx = true;
+                            }
 
-                                    rowStore.removeRow(oldVal.link());
+                            if (!activeTx) {
+                                // Should not delete oldest version which is less than cleanup version.
+                                int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+
+                                if (cmp <= 0) {
+                                    if (first)
+                                        first = false;
+                                    else {
+                                        boolean rmvd = dataTree.removex(oldVal);
+
+                                        assert rmvd;
+
+                                        rowStore.removeRow(oldVal.link());
+                                    }
                                 }
                             }
                         }
                     }
-                }
 
-                return waitTxs;
+                    return waitTxs;
+                }
             }
             finally {
                 busyLock.leaveBusy();
@@ -1647,14 +1709,26 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             CacheDataRow row;
 
             if (grp.mvccEnabled()) {
-                // TODO IGNITE-3484: need special method.
-                GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
-                    new MvccSearchRow(cacheId, key, 1, 1));
+                if (true) {
+                    MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
+
+                    dataTree.iterate(
+                        searchRow,
+                        new MvccKeyMinVersionBound(cacheId, key),
+                        searchRow // Use the same instance as closure to do not create extra object.
+                    );
 
-                if (cur.next())
-                    row = cur.get();
-                else
-                    row = null;
+                    row = searchRow.row();
+                }
+                else {
+                    GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+                        new MvccSearchRow(cacheId, key, 1, 1));
+
+                    if (cur.next())
+                        row = cur.get();
+                    else
+                        row = null;
+                }
             }
             else
                 row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1672,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         {
             assert grp.mvccEnabled();
 
+            // Note: this method is intended for testing only.
+
             key.valueBytes(cctx.cacheObjectContext());
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
@@ -1705,41 +1781,55 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            // TODO IGNITE-3484: need special method.
-            GridCursor<CacheDataRow> cur = dataTree.find(
-                new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
-                new MvccSearchRow(cacheId, key, 1, 1));
+            if (true) {
+                MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
 
-            CacheDataRow row = null;
+                dataTree.iterate(
+                    lower,
+                    new MvccKeyMinVersionBound(cacheId, key),
+                    lower // Use the same instance as closure to do not create extra object.
+                );
 
-            MvccLongList txs = ver.activeTransactions();
+                CacheDataRow row = lower.row();
 
-            while (cur.next()) {
-                CacheDataRow row0 = cur.get();
+                afterRowFound(row, key);
 
-                assert row0.mvccCoordinatorVersion() > 0 : row0;
+                return row;
+            }
+            else {
+                GridCursor<CacheDataRow> cur = dataTree.find(
+                    new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
+                    new MvccSearchRow(cacheId, key, 1, 1));
 
-                boolean visible;
+                CacheDataRow row = null;
 
-                if (txs != null) {
-                    visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
-                        || !txs.contains(row0.mvccCounter());
-                }
-                else
-                    visible = true;
+                MvccLongList txs = ver.activeTransactions();
 
-                if (visible) {
-                    row = row0;
+                while (cur.next()) {
+                    CacheDataRow row0 = cur.get();
 
-                    break;
-                }
-            }
+                    assert row0.mvccCoordinatorVersion() > 0 : row0;
 
-            assert row == null || key.equals(row.key());
+                    boolean visible;
 
-            //afterRowFound(row, key);
+                    if (txs != null) {
+                        visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
+                            || !txs.contains(row0.mvccCounter());
+                    }
+                    else
+                        visible = true;
 
-            return row;
+                    if (visible) {
+                        row = row0;
+
+                        break;
+                    }
+                }
+
+                assert row == null || key.equals(row.key());
+
+                return row;
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5080c83..b9b8ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
         MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.keySet())
+        long minActive = Long.MAX_VALUE;
+
+        for (Long txVer : activeTxs.keySet()) {
+            if (txVer < minActive)
+                minActive = txVer;
+
             res.addTx(txVer);
+        }
 
         Object old = activeTxs.put(nextCtr, txId);
 
@@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         long cleanupVer;
 
         if (prevCrdQueries.previousQueriesDone()) {
-            cleanupVer = committedCntr.get() - 1;
+            cleanupVer = Math.min(minActive, committedCntr.get());
+
+            cleanupVer--;
 
             Long qryVer = activeQueries.minimalQueryCounter();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index c73b4c7..b31a61f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -907,7 +907,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
 
             try {
-                cursor.init(pageAddr, io(pageAddr), 0);
+                cursor.init(pageAddr, io(pageAddr), -1);
             }
             finally {
                 readUnlock(firstPageId, firstPage, pageAddr);
@@ -972,6 +972,34 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         }
     }
 
+    /**
+     * @param lower Lower bound inclusive.
+     * @param upper Upper bound inclusive.
+     * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException {
+        checkDestroyed();
+
+        try {
+            ClosureCursor cursor = new ClosureCursor(lower, upper, c);
+
+            cursor.iterate();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+        }
+        catch (RuntimeException e) {
+            throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+        }
+        catch (AssertionError e) {
+            throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
+        }
+        finally {
+            checkDestroyed();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public T findFirst() throws IgniteCheckedException {
         checkDestroyed();
@@ -2509,14 +2537,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private final class GetCursor extends Get {
         /** */
-        ForwardCursor cursor;
+        AbstractForwardCursor cursor;
 
         /**
          * @param lower Lower bound.
          * @param shift Shift.
          * @param cursor Cursor.
          */
-        GetCursor(L lower, int shift, ForwardCursor cursor) {
+        GetCursor(L lower, int shift, AbstractForwardCursor cursor) {
             super(lower, false);
 
             assert shift != 0; // Either handle range of equal rows or find a greater row after concurrent merge.
@@ -4385,51 +4413,57 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
 
     /**
-     * Forward cursor.
+     *
      */
     @SuppressWarnings("unchecked")
-    private final class ForwardCursor implements GridCursor<T> {
-        /** */
-        private T[] rows = (T[])EMPTY;
-
-        /** */
-        private int row = -1;
-
+    private abstract class AbstractForwardCursor {
         /** */
-        private long nextPageId;
+        long nextPageId;
 
         /** */
-        private L lowerBound;
+        L lowerBound;
 
         /** */
         private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows.
 
         /** */
-        private final L upperBound;
-
-        /** */
-        private final Object x;
+        final L upperBound;
 
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
          */
-        ForwardCursor(L lowerBound, L upperBound) {
+        AbstractForwardCursor(L lowerBound, L upperBound) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
-            this.x = null;
         }
 
         /**
-         * @param lowerBound Lower bound.
-         * @param upperBound Upper bound.
-         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+         *
          */
-        ForwardCursor(L lowerBound, L upperBound, Object x) {
-            this.lowerBound = lowerBound;
-            this.upperBound = upperBound;
-            this.x = x;
-        }
+        abstract void init0();
+
+        /**
+         * @param pageAddr Page address.
+         * @param io IO.
+         * @param startIdx Start index.
+         * @param cnt Number of rows in the buffer.
+         * @return {@code true} If we were able to fetch rows from this page.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+            throws IgniteCheckedException;
+
+        /**
+         * @return {@code True} If we have rows to return after reading the next page.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract boolean reinitialize0() throws IgniteCheckedException;
+
+        /**
+         * @param readDone {@code True} if traversed all rows.
+         */
+        abstract void onNotFound(boolean readDone);
 
         /**
          * @param pageAddr Page address.
@@ -4437,9 +4471,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @param startIdx Start index.
          * @throws IgniteCheckedException If failed.
          */
-        private void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
+        final void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException {
             nextPageId = 0;
-            row = -1;
+
+            init0();
 
             int cnt = io.getCount(pageAddr);
 
@@ -4447,16 +4482,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             if (cnt == 0) {
                 assert io.getForward(pageAddr) == 0L;
 
-                rows = null;
-            }
-            else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) {
-                if (rows != EMPTY) {
-                    assert rows.length > 0; // Otherwise it makes no sense to create an array.
-
-                    // Fake clear.
-                    rows[0] = null;
-                }
+                onNotFound(true);
             }
+            else if (!fillFromBuffer(pageAddr, io, startIdx, cnt))
+                onNotFound(false);
         }
 
         /**
@@ -4466,7 +4495,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @return Adjusted to lower bound start index.
          * @throws IgniteCheckedException If failed.
          */
-        private int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
+        final int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException {
             assert io.isLeaf();
 
             // Compare with the first row on the page.
@@ -4491,7 +4520,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @return Corrected number of rows with respect to upper bound.
          * @throws IgniteCheckedException If failed.
          */
-        private int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
+        final int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws IgniteCheckedException {
             assert io.isLeaf();
 
             // Compare with the last row on the page.
@@ -4523,75 +4552,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throws IgniteCheckedException {
             assert io.isLeaf() : io;
             assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init).
-            assert startIdx >= 0 : startIdx;
+            assert startIdx >= 0 || startIdx == -1: startIdx;
             assert cnt >= startIdx;
 
             checkDestroyed();
 
             nextPageId = io.getForward(pageAddr);
 
-            if (lowerBound != null && startIdx == 0)
-                startIdx = findLowerBound(pageAddr, io, cnt);
-
-            if (upperBound != null && cnt != startIdx)
-                cnt = findUpperBound(pageAddr, io, startIdx, cnt);
-
-            cnt -= startIdx;
-
-            if (cnt == 0)
-                return false;
-
-            if (rows == EMPTY)
-                rows = (T[])new Object[cnt];
-
-            for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i, x);
-
-                rows = GridArrays.set(rows, i, r);
-            }
-
-            GridArrays.clearTail(rows, cnt);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean next() throws IgniteCheckedException {
-            if (rows == null)
-                return false;
-
-            if (++row < rows.length && rows[row] != null) {
-                clearLastRow(); // Allow to GC the last returned row.
-
-                return true;
-            }
-
-            return nextPage();
-        }
-
-        /**
-         * @return Cleared last row.
-         */
-        private T clearLastRow() {
-            if (row == 0)
-                return null;
-
-            int last = row - 1;
-
-            T r = rows[last];
-
-            assert r != null;
-
-            rows[last] = null;
-
-            return r;
+            return fillFromBuffer0(pageAddr, io, startIdx, cnt);
         }
 
         /**
          * @throws IgniteCheckedException If failed.
          */
-        private void find() throws IgniteCheckedException {
+        final void find() throws IgniteCheckedException {
             assert lowerBound != null;
 
             doFind(new GetCursor(lowerBound, lowerShift, this));
@@ -4607,21 +4581,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             // to the previous lower bound.
             find();
 
-            return next();
+            return reinitialize0();
         }
 
         /**
+         * @param lastRow Last read row (to be used as new lower bound).
          * @return {@code true} If we have rows to return after reading the next page.
          * @throws IgniteCheckedException If failed.
          */
-        private boolean nextPage() throws IgniteCheckedException {
-            updateLowerBound(clearLastRow());
-
-            row = 0;
+        final boolean nextPage(L lastRow) throws IgniteCheckedException {
+            updateLowerBound(lastRow);
 
             for (;;) {
                 if (nextPageId == 0) {
-                    rows = null;
+                    onNotFound(true);
 
                     return false; // Done.
                 }
@@ -4638,7 +4611,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                     try {
                         BPlusIO<L> io = io(pageAddr);
 
-                        if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr)))
+                        if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr)))
                             return true;
 
                         // Continue fetching forward.
@@ -4659,12 +4632,227 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /**
          * @param lower New exact lower bound.
          */
-        private void updateLowerBound(T lower) {
+        private void updateLowerBound(L lower) {
             if (lower != null) {
                 lowerShift = 1; // Now we have the full row an need to avoid duplicates.
                 lowerBound = lower; // Move the lower bound forward for further concurrent merge retries.
             }
         }
+    }
+
+    /**
+     * Closure cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private final class ClosureCursor extends AbstractForwardCursor {
+        /** */
+        private final TreeRowClosure<L, T> p;
+
+        /** */
+        private L lastRow;
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param p Row predicate.
+         */
+        ClosureCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> p) {
+            super(lowerBound, upperBound);
+
+            assert lowerBound != null;
+            assert upperBound != null;
+            assert p != null;
+
+            this.p = p;
+        }
+
+        /** {@inheritDoc} */
+        @Override void init0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+            throws IgniteCheckedException {
+            if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
+                startIdx = findLowerBound(pageAddr, io, cnt);
+
+            if (cnt == startIdx)
+                return false;
+
+            for (int i = startIdx; i < cnt; i++) {
+                int cmp = compare(0, io, pageAddr, i, upperBound);
+
+                if (cmp > 0) {
+                    nextPageId = 0; // The End.
+
+                    return false;
+                }
+
+                boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i);
+
+                if (stop) {
+                    nextPageId = 0; // The End.
+
+                    return true;
+                }
+            }
+
+            if (nextPageId != 0)
+                lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row.
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean reinitialize0() throws IgniteCheckedException {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override void onNotFound(boolean readDone) {
+            nextPageId = 0;
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void iterate() throws IgniteCheckedException {
+            find();
+
+            if (nextPageId == 0) {
+                return;
+            }
+
+            for (;;) {
+                L lastRow0 = lastRow;
+
+                lastRow = null;
+
+                nextPage(lastRow0);
+
+                if (nextPageId == 0)
+                    return;
+            }
+        }
+    }
+
+    /**
+     * Forward cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> {
+        /** */
+        final Object x;
+
+        /** */
+        private T[] rows = (T[])EMPTY;
+
+        /** */
+        private int row = -1;
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+         */
+        ForwardCursor(L lowerBound, L upperBound, Object x) {
+            super(lowerBound, upperBound);
+
+            this.x = x;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
+            if (startIdx == -1) {
+                if (lowerBound != null)
+                    startIdx = findLowerBound(pageAddr, io, cnt);
+                else
+                    startIdx = 0;
+            }
+
+            if (upperBound != null && cnt != startIdx)
+                cnt = findUpperBound(pageAddr, io, startIdx, cnt);
+
+            cnt -= startIdx;
+
+            if (cnt == 0)
+                return false;
+
+            if (rows == EMPTY)
+                rows = (T[])new Object[cnt];
+
+            for (int i = 0; i < cnt; i++) {
+                T r = getRow(io, pageAddr, startIdx + i, x);
+
+                rows = GridArrays.set(rows, i, r);
+            }
+
+            GridArrays.clearTail(rows, cnt);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean reinitialize0() throws IgniteCheckedException {
+            return next();
+        }
+
+        /** {@inheritDoc} */
+        @Override void onNotFound(boolean readDone) {
+            if (readDone)
+                rows = null;
+            else {
+                if (rows != EMPTY) {
+                    assert rows.length > 0; // Otherwise it makes no sense to create an array.
+
+                    // Fake clear.
+                    rows[0] = null;
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override void init0() {
+            row = -1;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean next() throws IgniteCheckedException {
+            if (rows == null)
+                return false;
+
+            if (++row < rows.length && rows[row] != null) {
+                clearLastRow(); // Allow to GC the last returned row.
+
+                return true;
+            }
+
+            T lastRow = clearLastRow();
+
+            row = 0;
+
+            return nextPage(lastRow);
+        }
+
+        /**
+         * @return Cleared last row.
+         */
+        private T clearLastRow() {
+            if (row == 0)
+                return null;
+
+            int last = row - 1;
+
+            T r = rows[last];
+
+            assert r != null;
+
+            rows[last] = null;
+
+            return r;
+        }
 
         /** {@inheritDoc} */
         @Override public T get() {
@@ -4805,4 +4993,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         DONE
     }
+
+    /**
+     *
+     */
+    public interface TreeRowClosure<L, T extends L> {
+        /**
+         * @param tree Tree.
+         * @param io Tree IO.
+         * @param pageAddr Page address.
+         * @param idx Item index.
+         * @return {@code True} if item pass predicate. TODO IGNITE-3478
+         * @throws IgniteCheckedException If failed.
+         */
+        public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx)
+            throws IgniteCheckedException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 3fc0962..a07d012 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -76,8 +76,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
         long link = getLink(pageAddr, idx);
 
         if (storeMvccVersion()) {
-            long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
-            long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+            long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
 
             return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,
@@ -119,8 +119,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
         }
 
         if (storeMvccVersion()) {
-            long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
-            long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx);
+            long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+            long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
             assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index a4eac3e..ef08bec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -94,8 +94,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
         }
 
         if (storeMvccVersion()) {
-            long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccUpdateTopologyVersion(srcPageAddr, srcIdx);
-            long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx);
+            long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+            long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
             assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
@@ -114,8 +114,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
         long link = getLink(pageAddr, idx);
 
         if (storeMvccVersion()) {
-            long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx);
-            long mvccCntr = getMvccUpdateCounter(pageAddr, idx);
+            long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
 
             return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 767c996..eaeefee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
     /** {@inheritDoc} */
     @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
         throws IgniteCheckedException {
-        assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class;
+        assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row;
 
         RowLinkIO io = (RowLinkIO)iox;
 
@@ -158,14 +158,14 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
         if (cmp != 0 || !grp.mvccEnabled())
             return 0;
 
-        long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx);
+        long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
 
         cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
 
         if (cmp != 0)
             return cmp;
 
-        long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx);
+        long mvccCntr = io.getMvccCounter(pageAddr, idx);
 
         assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
 
@@ -188,8 +188,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
             CacheDataRowAdapter.RowData.FULL;
 
         if (grp.mvccEnabled()) {
-            long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(pageAddr, idx);
-            long mvccCntr = rowIo.getMvccUpdateCounter(pageAddr, idx);
+            long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
             return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index fc9d15d..3d02b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return 0;
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index b328924..58ae9ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -53,12 +53,12 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return 0;
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 0d424b7..19a5c47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -53,12 +53,12 @@ public final class DataInnerIO extends AbstractDataInnerIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return 0;
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ff51bc2..ab10b96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -53,12 +53,12 @@ public final class DataLeafIO extends AbstractDataLeafIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return 0;
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return CacheCoordinatorsProcessor.COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
index 5f4f44c..51a911d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java
@@ -53,12 +53,12 @@ public final class MvccDataInnerIO extends AbstractDataInnerIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return PageUtils.getLong(pageAddr, offset(idx) + 12);
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return PageUtils.getLong(pageAddr, offset(idx) + 20);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
index e7cfca7..84c33a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java
@@ -53,12 +53,12 @@ public final class MvccDataLeafIO extends AbstractDataLeafIO {
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) {
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
         return PageUtils.getLong(pageAddr, offset(idx) + 12);
     }
 
     /** {@inheritDoc} */
-    @Override public long getMvccUpdateCounter(long pageAddr, int idx) {
+    @Override public long getMvccCounter(long pageAddr, int idx) {
         return PageUtils.getLong(pageAddr, offset(idx) + 20);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
new file mode 100644
index 0000000..aa9422d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+    /** */
+    private CacheDataRow resRow;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     */
+    public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) {
+        super(cacheId, key);
+    }
+
+    /**
+     * @return Found row.
+     */
+    @Nullable public CacheDataRow row() {
+        return resRow;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx)
+        throws IgniteCheckedException
+    {
+        resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+        return false;  // Stop search.
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccKeyMaxVersionBound.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
new file mode 100644
index 0000000..f2ac308
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccKeyMinVersionBound extends SearchRow {
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     */
+    public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) {
+        super(cacheId, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return 1L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return 1L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccKeyMinVersionBound.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
new file mode 100644
index 0000000..79544e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+    /** */
+    private Boolean hasPrev;
+
+    /** */
+    private boolean canCleanup;
+
+    /** */
+    private GridLongList activeTxs;
+
+    /** */
+    private List<CacheSearchRow> cleanupRows;
+
+    /** */
+    private final MvccCoordinatorVersion mvccVer;
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @param mvccVer Mvcc version.
+     * @param part Partition.
+     * @param cacheId Cache ID.
+     */
+    public MvccUpdateRow(
+        KeyCacheObject key,
+        CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer,
+        int part,
+        int cacheId) {
+        super(key, val, ver, part, 0L, cacheId);
+
+        this.mvccVer = mvccVer;
+    }
+
+    /**
+     * @return {@code True} if previous value was non-null.
+     */
+    public boolean previousNotNull() {
+        return hasPrev != null && hasPrev;
+    }
+
+    /**
+     * @return Active transactions to wait for.
+     */
+    @Nullable public GridLongList activeTransactions() {
+        return activeTxs;
+    }
+
+    /**
+     * @return Rows which are safe to cleanup.
+     */
+    public List<CacheSearchRow> cleanupRows() {
+        return cleanupRows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+        BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx)
+        throws IgniteCheckedException
+    {
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        // All previous version should be less then new one.
+        assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+        assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+
+        boolean checkActive = mvccVer.activeTransactions().size() > 0;
+
+        boolean txActive = false;
+
+        // Suppose transactions on previous coordinator versions are done.
+        if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+            long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+            if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
+                txActive = true;
+
+                if (activeTxs == null)
+                    activeTxs = new GridLongList();
+
+                activeTxs.add(rowMvccCntr);
+            }
+        }
+
+        if (hasPrev == null)
+            hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
+
+        if (!txActive) {
+            assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+
+            int cmp;
+
+            if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+                cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+            else
+                cmp = 1;
+
+            if (cmp >= 0) {
+                // Do not cleanup oldest version.
+                if (canCleanup) {
+                    CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
+
+                    assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+
+                    // Should not be possible to cleanup active tx.
+                    assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+                        || !mvccVer.activeTransactions().contains(row.mvccCounter());
+
+                    if (cleanupRows == null)
+                        cleanupRows = new ArrayList<>();
+
+                    cleanupRows.add(row);
+                }
+                else
+                    canCleanup = true;
+            }
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return mvccVer.coordinatorVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return mvccVer.counter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccUpdateRow.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
new file mode 100644
index 0000000..c829afb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+    /** */
+    private final MvccCoordinatorVersion ver;
+
+    /** */
+    private CacheDataRow resRow;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param ver Mvcc version.
+     */
+    public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion ver) {
+        super(cacheId, key);
+
+        assert ver != null;
+
+        this.ver = ver;
+    }
+
+    /**
+     * @return Found row.
+     */
+    @Nullable public CacheDataRow row() {
+        return resRow;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+        BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx) throws IgniteCheckedException
+    {
+        boolean visible = true;
+
+        if (ver.activeTransactions().size() > 0) {
+            RowLinkIO rowIo = (RowLinkIO)io;
+
+            // TODO IGNITE-3478 sort active transactions?
+            if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+                visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
+        }
+
+        if (visible) {
+            resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+            return false; // Stop search.
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return ver.coordinatorVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return ver.counter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccVersionBasedSearchRow.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
index 8b341cb..111968d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java
@@ -42,7 +42,17 @@ public interface RowLinkIO {
      */
     public int getCacheId(long pageAddr, int idx);
 
-    public long getMvccUpdateTopologyVersion(long pageAddr, int idx);
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc coordinator version.
+     */
+    public long getMvccCoordinatorVersion(long pageAddr, int idx);
 
-    public long getMvccUpdateCounter(long pageAddr, int idx);
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc counter.
+     */
+    public long getMvccCounter(long pageAddr, int idx);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index f28fe2d..115e8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
@@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2534,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param restartCrd If {@code true} dedicated coordinator node is restarted during test.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -2680,6 +2686,149 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             stop.set(true);
         }
     }
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSize() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+        assertEquals(cache.size(), 0);
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < KEYS; i++) {
+            final Integer key = i;
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(key, i);
+
+                tx.commit();
+            }
+
+            assertEquals(i + 1, cache.size());
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            final Integer key = i;
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(key, i);
+
+                tx.commit();
+            }
+
+            assertEquals(KEYS, cache.size());
+        }
+
+        // TODO IGNITE-3478: test removes.
+    }
+
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInternalApi() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+        GridCacheContext cctx =
+            ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName()));
+
+        CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators();
+
+        // Start query to prevent cleanup.
+        IgniteInternalFuture<MvccCoordinatorVersion> fut = crd.requestQueryCounter(crd.currentCoordinator());
+
+        fut.get();
+
+        final int KEYS = 1000;
+
+        for (int i = 0; i < 10; i++) {
+            for (int k = 0; k < KEYS; k++) {
+                final Integer key = k;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, i);
+
+                    tx.commit();
+                }
+            }
+        }
+
+        for (int k = 0; k < KEYS; k++) {
+            final Integer key = k;
+
+            KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+            List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx, key0);
+
+            assertEquals(10, vers.size());
+
+            CacheDataRow row = cctx.offheap().read(cctx, key0);
+
+            checkRow(cctx, row, key0, vers.get(0).get1());
+
+            for (T2<Object, MvccCounter> ver : vers) {
+                MvccCounter cntr = ver.get2();
+
+                MvccCoordinatorVersion readVer =
+                    new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0);
+
+                row = cctx.offheap().mvccRead(cctx, key0, readVer);
+
+                checkRow(cctx, row, key0, ver.get1());
+            }
+
+            checkRow(cctx,
+                cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion() + 1, 1)),
+                key0,
+                vers.get(0).get1());
+
+            checkRow(cctx,
+                cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1)),
+                key0,
+                vers.get(0).get1());
+
+            MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+
+            for (int v = 0; v < vers.size(); v++) {
+                MvccCounter cntr = vers.get(v).get2();
+
+                ver.addTx(cntr.counter());
+
+                row = cctx.offheap().mvccRead(cctx, key0, ver);
+
+                if (v == vers.size() - 1)
+                    assertNull(row);
+                else
+                    checkRow(cctx, row, key0, vers.get(v + 1).get1());
+            }
+        }
+    }
+
+    /**
+     * @param cctx Context.
+     * @param row Row.
+     * @param expKey Expected row key.
+     * @param expVal Expected row value.
+     */
+    private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) {
+        assertNotNull(row);
+        assertEquals(expKey, row.key());
+        assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false));
+    }
+
+    /**
+     * @param crdVer Coordinator version.
+     * @param cntr Counter.
+     * @return Version.
+     */
+    private MvccCoordinatorVersionResponse version(long crdVer, long cntr) {
+        return new MvccCoordinatorVersionResponse(crdVer, cntr, 0);
+    }
 
     /**
      * @return Cache configurations.

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 9c0d791..e2f6b2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertEquals(x, tree.findOne(x).longValue());
+            checkIterate(tree, x, x, x, true);
 
             assertNoLocks();
 
@@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         assertNull(tree.findOne(-1L));
 
-        for (long x = 0; x < cnt; x++)
+        for (long x = 0; x < cnt; x++) {
             assertEquals(x, tree.findOne(x).longValue());
+            checkIterate(tree, x, x, x, true);
+        }
 
         assertNoLocks();
 
         assertNull(tree.findOne(cnt));
+        checkIterate(tree, cnt, cnt, null, false);
 
         for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) {
             X.println(" -- " + x);
@@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertNull(tree.findOne(x));
+            checkIterate(tree, x, x, null, false);
 
             assertNoLocks();
 
@@ -619,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param tree
+     * @param lower
+     * @param upper
+     * @param exp
+     * @param expFound
+     * @throws IgniteCheckedException
+     */
+    private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
+        throws IgniteCheckedException {
+        TestTreeRowClosure c = new TestTreeRowClosure(exp);
+
+        tree.iterate(lower, upper, c);
+
+        assertEquals(expFound, c.found);
+    }
+
+    private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
+        throws IgniteCheckedException {
+        c.found = false;
+
+        tree.iterate(lower, upper, c);
+
+        assertEquals(expFound, c.found);
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
@@ -1242,6 +1274,206 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testIterate() throws Exception {
+        MAX_PER_PAGE = 5;
+
+        TestTree tree = createTestTree(true);
+
+        checkIterate(tree, 0L, 100L, null, false);
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            tree.put(idx);
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            checkIterate(tree, idx, 100L, idx, true);
+
+        checkIterate(tree, 0L, 100L, 1L, true);
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            checkIterate(tree, idx, 100L, 10L, true);
+
+        checkIterate(tree, 0L, 100L, 100L, false);
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            checkIterate(tree, 0L, 100L, idx, true);
+
+        for (long idx = 0L; idx <= 10L; ++idx)
+            checkIterate(tree, idx, 11L, -1L, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIterateConcurrentPutRemove() throws Exception {
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIterateConcurrentPutRemove_1() throws Exception {
+        MAX_PER_PAGE = 1;
+
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIterateConcurrentPutRemove_5() throws Exception {
+        MAX_PER_PAGE = 5;
+
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIteratePutRemove_10() throws Exception {
+        MAX_PER_PAGE = 10;
+
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void findOneBoundedConcurrentPutRemove() throws Exception {
+        final TestTree tree = createTestTree(true);
+
+        final int KEYS = 10_000;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10; i++) {
+            for (long idx = 0L; idx < KEYS; ++idx)
+                tree.put(idx);
+
+            final Long findKey;
+
+            if (MAX_PER_PAGE > 0) {
+                switch (i) {
+                    case 0:
+                        findKey = 1L;
+
+                        break;
+
+                    case 1:
+                        findKey = (long)MAX_PER_PAGE;
+
+                        break;
+
+                    case 2:
+                        findKey = (long)MAX_PER_PAGE - 1;
+
+                        break;
+
+                    case 3:
+                        findKey = (long)MAX_PER_PAGE + 1;
+
+                        break;
+
+                    case 4:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE;
+
+                        break;
+
+                    case 5:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1;
+
+                        break;
+
+                    case 6:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1;
+
+                        break;
+
+                    case 7:
+                        findKey = (long)KEYS - 1;
+
+                        break;
+
+                    default:
+                        findKey = rnd.nextLong(KEYS);
+                }
+            }
+            else
+                findKey = rnd.nextLong(KEYS);
+
+            info("Iteration [iter=" + i + ", key=" + findKey + ']');
+
+            assertEquals(findKey, tree.findOne(findKey));
+            checkIterate(tree, findKey, findKey, findKey, true);
+
+            IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    TestTreeRowClosure p = new TestTreeRowClosure(findKey);
+
+                    TestTreeRowClosure falseP = new TestTreeRowClosure(-1L);
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100);
+
+                        checkIterateC(tree, findKey, findKey, p, true);
+
+                        checkIterateC(tree, findKey - shift, findKey, p, true);
+
+                        checkIterateC(tree, findKey - shift, findKey + shift, p, true);
+
+                        checkIterateC(tree, findKey, findKey + shift, p, true);
+
+                        checkIterateC(tree, -100L, KEYS + 100L, falseP, false);
+
+                        cnt++;
+                    }
+
+                    info("Done, read count: " + cnt);
+
+                    return null;
+                }
+            }, 10, "find");
+
+            asyncRunFut = new GridCompoundFuture<>();
+
+            asyncRunFut.add(getFut);
+
+            asyncRunFut.markInitialized();
+
+            try {
+                U.sleep(100);
+
+                for (int j = 0; j < 20; j++) {
+                    for (long idx = 0L; idx < KEYS / 2; ++idx) {
+                        long toRmv = rnd.nextLong(KEYS);
+
+                        if (toRmv != findKey)
+                            tree.remove(toRmv);
+                    }
+
+                    for (long idx = 0L; idx < KEYS / 2; ++idx) {
+                        long put = rnd.nextLong(KEYS);
+
+                        tree.put(put);
+                    }
+                }
+            }
+            finally {
+                stop.set(true);
+            }
+
+            asyncRunFut.get();
+
+            stop.set(false);
+        }
+    }
+
+    /**
      *
      */
     public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
@@ -1449,6 +1681,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
                         last = c.get();
                     }
+
+                    TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure();
+
+                    tree.iterate((long)low, (long)high, cl);
+
+                    last = cl.val;
+
+                    if (last != null) {
+                        assertTrue(low + " <= " + last + " <= " + high, last >= low);
+                        assertTrue(low + " <= " + last + " <= " + high, last <= high);
+                    }
                 }
 
                 return null;
@@ -1853,4 +2096,50 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             return PageUtils.getLong(pageAddr, offset(idx));
         }
     }
+
+    /**
+     *
+     */
+    static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+        /** */
+        private final Long expVal;
+
+        /** */
+        private boolean found;
+
+        /**
+         * @param expVal Value to find or {@code null} to find first.
+         */
+        TestTreeRowClosure(Long expVal) {
+            this.expVal = expVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+            throws IgniteCheckedException {
+            assert !found;
+
+            found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal);
+
+            return !found;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+        /** */
+        private Long val;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+            throws IgniteCheckedException {
+            assert val == null;
+
+            val = io.getLookupRow(tree, pageAddr, idx);
+
+            return false;
+        }
+    }
 }


Mime
View raw message