ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [06/17] ignite git commit: ignite-5937
Date Wed, 11 Oct 2017 10:43:04 GMT
ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e25b649
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e25b649
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e25b649

Branch: refs/heads/ignite-5932
Commit: 6e25b649f758c3aa308118354b08c6899dd50654
Parents: f8be46d
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 4 16:57:59 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Oct 5 17:54:17 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManager.java        |  11 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 179 +++++++++++++------
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  12 +-
 .../cache/persistence/tree/BPlusTree.java       | 106 +++++------
 .../processors/cache/tree/CacheDataTree.java    |   2 +-
 .../cache/tree/MvccKeyMaxVersionBound.java      |  77 ++++++++
 .../cache/tree/MvccKeyMinVersionBound.java      |  49 +++++
 .../processors/cache/tree/MvccUpdateRow.java    | 177 ++++++++++++++++++
 .../cache/tree/MvccVersionBasedSearchRow.java   |  36 +++-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  40 +++++
 .../processors/database/BPlusTreeSelfTest.java  | 123 +++++++++----
 11 files changed, 662 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9d03e4a..8967ce8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager {
             long expireTime,
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
 
-        GridLongList mvccUpdate(
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param mvccVer Mvcc version.
+         * @return List of transactions to wait for.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable GridLongList mvccUpdate(
             GridCacheContext cctx,
             KeyCacheObject key,
             CacheObject val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index eef645d..25f36b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -55,7 +55,10 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
@@ -1361,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             try {
                 int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-                MvccDataRow dataRow = new MvccDataRow(
-                    key,
-                    val,
-                    ver,
-                    partId,
-                    cacheId,
-                    mvccVer.coordinatorVersion(),
-                    mvccVer.counter());
-
                 CacheObjectContext coCtx = cctx.cacheObjectContext();
 
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
-                rowStore.addRow(dataRow);
+                if (true) {
+                    MvccUpdateRow updateRow = new MvccUpdateRow(
+                        key,
+                        val,
+                        ver,
+                        mvccVer,
+                        partId,
+                        cacheId);
 
-                assert dataRow.link() != 0 : dataRow;
+                    rowStore.addRow(updateRow);
 
-                if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                    dataRow.cacheId(cctx.cacheId());
+                    assert updateRow.link() != 0 : updateRow;
 
-                boolean old = dataTree.putx(dataRow);
+                    if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                        updateRow.cacheId(cctx.cacheId());
 
-                assert !old;
+                    GridLongList waitTxs = null;
 
-                GridLongList waitTxs = null;
+                    if (mvccVer.initialLoad()) {
+                        boolean old = dataTree.putx(updateRow);
 
-                if (!mvccVer.initialLoad()) {
-                    MvccLongList activeTxs = mvccVer.activeTransactions();
+                        assert !old;
 
-                    // TODO IGNITE-3484: need special method.
-                    GridCursor<CacheDataRow> cur = dataTree.find(
-                        new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
-                        new MvccSearchRow(cacheId, key, 1, 1));
+                        incrementSize(cctx.cacheId());
+                    }
+                    else {
+                        dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                    boolean first = true;
+                        boolean old = dataTree.putx(updateRow);
 
-                    boolean activeTx = false;
+                        assert !old;
 
-                    while (cur.next()) {
-                        CacheDataRow oldVal = cur.get();
+                        if (!updateRow.previousNotNull())
+                            incrementSize(cctx.cacheId());
+
+                        waitTxs = updateRow.activeTransactions();
+
+                        List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
 
-                        assert oldVal.link() != 0 : oldVal;
+                        if (cleanupRows != null) {
+                            for (int i = 0; i < cleanupRows.size(); i++) {
+                                CacheSearchRow oldRow = cleanupRows.get(i);
 
-                        if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
-                            activeTxs.contains(oldVal.mvccCounter())) {
-                            if (waitTxs == null)
-                                waitTxs = new GridLongList();
+                                assert oldRow.link() != 0L : oldRow;
 
-                            assert oldVal.mvccCounter() != mvccVer.counter();
+                                boolean rmvd = dataTree.removex(oldRow);
 
-                            waitTxs.add(oldVal.mvccCounter());
+                                assert rmvd;
 
-                            activeTx = true;
+                                rowStore.removeRow(oldRow.link());
+                            }
                         }
+                    }
 
-                        if (!activeTx) {
-                            // Should not delete oldest version which is less than cleanup version.
-                            int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+                    return waitTxs;
+                }
+                else {
+                    MvccDataRow dataRow = new MvccDataRow(
+                        key,
+                        val,
+                        ver,
+                        partId,
+                        cacheId,
+                        mvccVer.coordinatorVersion(),
+                        mvccVer.counter());
 
-                            if (cmp <= 0) {
-                                if (first)
-                                    first = false;
-                                else {
-                                    boolean rmvd = dataTree.removex(oldVal);
+                    rowStore.addRow(dataRow);
+
+                    assert dataRow.link() != 0 : dataRow;
+
+                    if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                        dataRow.cacheId(cctx.cacheId());
 
-                                    assert rmvd;
+                    boolean old = dataTree.putx(dataRow);
 
-                                    rowStore.removeRow(oldVal.link());
+                    assert !old;
+
+                    GridLongList waitTxs = null;
+
+                    if (!mvccVer.initialLoad()) {
+                        MvccLongList activeTxs = mvccVer.activeTransactions();
+
+                        // TODO IGNITE-3484: need special method.
+                        GridCursor<CacheDataRow> cur = dataTree.find(
+                            new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
+                            new MvccSearchRow(cacheId, key, 1, 1));
+
+                        boolean first = true;
+
+                        boolean activeTx = false;
+
+                        while (cur.next()) {
+                            CacheDataRow oldVal = cur.get();
+
+                            assert oldVal.link() != 0 : oldVal;
+
+                            if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
+                                activeTxs.contains(oldVal.mvccCounter())) {
+                                if (waitTxs == null)
+                                    waitTxs = new GridLongList();
+
+                                assert oldVal.mvccCounter() != mvccVer.counter();
+
+                                waitTxs.add(oldVal.mvccCounter());
+
+                                activeTx = true;
+                            }
+
+                            if (!activeTx) {
+                                // Should not delete oldest version which is less than cleanup version.
+                                int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+
+                                if (cmp <= 0) {
+                                    if (first)
+                                        first = false;
+                                    else {
+                                        boolean rmvd = dataTree.removex(oldVal);
+
+                                        assert rmvd;
+
+                                        rowStore.removeRow(oldVal.link());
+                                    }
                                 }
                             }
                         }
                     }
-                }
 
-                return waitTxs;
+                    return waitTxs;
+                }
             }
             finally {
                 busyLock.leaveBusy();
@@ -1649,11 +1710,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             if (grp.mvccEnabled()) {
                 if (true) {
-                    row = dataTree.findOneBounded(
-                        new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
-                        new MvccSearchRow(cacheId, key, 1L, 1L),
-                        null,
-                        CacheDataRowAdapter.RowData.NO_KEY);
+                    MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
+
+                    dataTree.iterate(
+                        searchRow,
+                        new MvccKeyMinVersionBound(cacheId, key),
+                        searchRow // Use the same instance as closure to do not create extra object.
+                    );
+
+                    row = searchRow.row();
                 }
                 else {
                     GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
@@ -1681,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         {
             assert grp.mvccEnabled();
 
+            // Note: this method is intended for testing only.
+
             key.valueBytes(cctx.cacheObjectContext());
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
@@ -1717,11 +1784,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             if (true) {
                 MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
 
-                CacheDataRow row = dataTree.findOneBounded(
+                dataTree.iterate(
                     lower,
-                    new MvccSearchRow(cacheId, key, 1L, 1L),
-                    lower, // Use the same instance as predicate to do not create extra object.
-                    CacheDataRowAdapter.RowData.NO_KEY);
+                    new MvccKeyMinVersionBound(cacheId, key),
+                    lower // Use the same instance as closure to do not create extra object.
+                );
+
+                CacheDataRow row = lower.row();
 
                 afterRowFound(row, key);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5080c83..b9b8ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         // TODO IGNITE-3478 sorted? + change GridLongList.writeTo?
         MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
 
-        for (Long txVer : activeTxs.keySet())
+        long minActive = Long.MAX_VALUE;
+
+        for (Long txVer : activeTxs.keySet()) {
+            if (txVer < minActive)
+                minActive = txVer;
+
             res.addTx(txVer);
+        }
 
         Object old = activeTxs.put(nextCtr, txId);
 
@@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         long cleanupVer;
 
         if (prevCrdQueries.previousQueriesDone()) {
-            cleanupVer = committedCntr.get() - 1;
+            cleanupVer = Math.min(minActive, committedCntr.get());
+
+            cleanupVer--;
 
             Long qryVer = activeQueries.minimalQueryCounter();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index b6c5c96..9752b17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -908,7 +908,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed.
 
             try {
-                cursor.init(pageAddr, io(pageAddr), 0);
+                cursor.init(pageAddr, io(pageAddr), -1);
             }
             finally {
                 readUnlock(firstPageId, firstPage, pageAddr);
@@ -976,17 +976,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      * @param lower Lower bound inclusive.
      * @param upper Upper bound inclusive.
-     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
-     * @return First found item which meets bounds and pass predicate.
+     * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}.
      * @throws IgniteCheckedException If failed.
      */
-    public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws IgniteCheckedException {
+    public void iterate(L lower, L upper, TreeRowClosure<L, T> c) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
-            FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
+            ClosureCursor cursor = new ClosureCursor(lower, upper, c);
 
-            return cursor.findOne();
+            cursor.iterate();
         }
         catch (IgniteCheckedException e) {
             throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e);
@@ -4431,18 +4430,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         final L upperBound;
 
-        /** */
-        final Object x;
-
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
-         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
          */
-        AbstractForwardCursor(L lowerBound, L upperBound, Object x) {
+        AbstractForwardCursor(L lowerBound, L upperBound) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
-            this.x = x;
         }
 
         /**
@@ -4559,7 +4553,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throws IgniteCheckedException {
             assert io.isLeaf() : io;
             assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init).
-            assert startIdx >= 0 : startIdx;
+            assert startIdx >= 0 || startIdx == -1: startIdx;
             assert cnt >= startIdx;
 
             checkDestroyed();
@@ -4596,7 +4590,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @return {@code true} If we have rows to return after reading the next page.
          * @throws IgniteCheckedException If failed.
          */
-        final boolean nextPage(T lastRow) throws IgniteCheckedException {
+        final boolean nextPage(L lastRow) throws IgniteCheckedException {
             updateLowerBound(lastRow);
 
             for (;;) {
@@ -4618,7 +4612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                     try {
                         BPlusIO<L> io = io(pageAddr);
 
-                        if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr)))
+                        if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr)))
                             return true;
 
                         // Continue fetching forward.
@@ -4639,7 +4633,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /**
          * @param lower New exact lower bound.
          */
-        private void updateLowerBound(T lower) {
+        private void updateLowerBound(L lower) {
             if (lower != null) {
                 lowerShift = 1; // Now we have the full row an need to avoid duplicates.
                 lowerBound = lower; // Move the lower bound forward for further concurrent merge retries.
@@ -4648,30 +4642,27 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     }
 
     /**
-     * Forward cursor.
+     * Closure cursor.
      */
     @SuppressWarnings("unchecked")
-    private final class FindOneCursor extends AbstractForwardCursor {
-        /** */
-        private Object resRow;
-
+    private final class ClosureCursor extends AbstractForwardCursor {
         /** */
-        private T lastRow;
+        private final TreeRowClosure<L, T> p;
 
         /** */
-        private final RowPredicate<L, T> p;
+        private L lastRow;
 
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
          * @param p Row predicate.
-         * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
          */
-        FindOneCursor(L lowerBound, L upperBound, @Nullable RowPredicate<L, T> p, Object x) {
-            super(lowerBound, upperBound, x);
+        ClosureCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> p) {
+            super(lowerBound, upperBound);
 
             assert lowerBound != null;
             assert upperBound != null;
+            assert p != null;
 
             this.p = p;
         }
@@ -4682,8 +4673,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         }
 
         /** {@inheritDoc} */
-        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
-            if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
+        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt)
+            throws IgniteCheckedException {
+            if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item?
                 startIdx = findLowerBound(pageAddr, io, cnt);
 
             if (cnt == startIdx)
@@ -4698,15 +4690,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                     return false;
                 }
 
-                if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) {
-                    resRow = getRow(io, pageAddr, i, x);
+                boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i);
+
+                if (stop) {
+                    nextPageId = 0; // The End.
 
                     return true;
                 }
             }
 
             if (nextPageId != 0)
-                lastRow = getRow(io, pageAddr, cnt - 1, null); // Need save last row.
+                lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row.
 
             return true;
         }
@@ -4718,36 +4712,28 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
         /** {@inheritDoc} */
         @Override void onNotFound(boolean readDone) {
-            resRow = EMPTY;
+            nextPageId = 0;
         }
 
         /**
          * @throws IgniteCheckedException If failed.
-         * @return Found row.
          */
-        private T findOne() throws IgniteCheckedException {
+        private void iterate() throws IgniteCheckedException {
             find();
 
-            if (resRow != null) {
-                if (resRow == EMPTY)
-                    return null;
-
-                return (T)resRow;
+            if (nextPageId == 0) {
+                return;
             }
 
             for (;;) {
-                T lastRow0 = lastRow;
+                L lastRow0 = lastRow;
 
                 lastRow = null;
 
                 nextPage(lastRow0);
 
-                if (resRow != null) {
-                    if (resRow == EMPTY)
-                        return null;
-
-                    return (T)resRow;
-                }
+                if (nextPageId == 0)
+                    return;
             }
         }
     }
@@ -4758,6 +4744,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     @SuppressWarnings("unchecked")
     private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T> {
         /** */
+        final Object x;
+
+        /** */
         private T[] rows = (T[])EMPTY;
 
         /** */
@@ -4769,13 +4758,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
          */
         ForwardCursor(L lowerBound, L upperBound, Object x) {
-            super(lowerBound, upperBound, x);
+            super(lowerBound, upperBound);
+
+            this.x = x;
         }
 
         /** {@inheritDoc} */
         @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx, int cnt) throws IgniteCheckedException {
-            if (lowerBound != null && startIdx == 0)
-                startIdx = findLowerBound(pageAddr, io, cnt);
+            if (startIdx == -1) {
+                if (lowerBound != null)
+                    startIdx = findLowerBound(pageAddr, io, cnt);
+                else
+                    startIdx = 0;
+            }
 
             if (upperBound != null && cnt != startIdx)
                 cnt = findUpperBound(pageAddr, io, startIdx, cnt);
@@ -5003,7 +4998,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      *
      */
-    public interface RowPredicate<L, T extends L> {
-        public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+    public interface TreeRowClosure<L, T extends L> {
+        /**
+         * @param tree Tree.
+         * @param io Tree IO.
+         * @param pageAddr Page address.
+         * @param idx Item index.
+         * @return {@code True} if item pass predicate. TODO IGNITE-3478
+         * @throws IgniteCheckedException If failed.
+         */
+        public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr, int idx)
+            throws IgniteCheckedException;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a1bfc9b..eaeefee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
     /** {@inheritDoc} */
     @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
         throws IgniteCheckedException {
-        assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class;
+        assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row;
 
         RowLinkIO io = (RowLinkIO)iox;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
new file mode 100644
index 0000000..aa9422d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+    /** */
+    private CacheDataRow resRow;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     */
+    public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) {
+        super(cacheId, key);
+    }
+
+    /**
+     * @return Found row.
+     */
+    @Nullable public CacheDataRow row() {
+        return resRow;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree, BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx)
+        throws IgniteCheckedException
+    {
+        resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+
+        return false;  // Stop search.
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccKeyMaxVersionBound.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
new file mode 100644
index 0000000..f2ac308
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccKeyMinVersionBound extends SearchRow {
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     */
+    public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) {
+        super(cacheId, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return 1L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return 1L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccKeyMinVersionBound.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
new file mode 100644
index 0000000..79544e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
+    /** */
+    private Boolean hasPrev;
+
+    /** */
+    private boolean canCleanup;
+
+    /** */
+    private GridLongList activeTxs;
+
+    /** */
+    private List<CacheSearchRow> cleanupRows;
+
+    /** */
+    private final MvccCoordinatorVersion mvccVer;
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
+     * @param mvccVer Mvcc version.
+     * @param part Partition.
+     * @param cacheId Cache ID.
+     */
+    public MvccUpdateRow(
+        KeyCacheObject key,
+        CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer,
+        int part,
+        int cacheId) {
+        super(key, val, ver, part, 0L, cacheId);
+
+        this.mvccVer = mvccVer;
+    }
+
+    /**
+     * @return {@code True} if previous value was non-null.
+     */
+    public boolean previousNotNull() {
+        return hasPrev != null && hasPrev;
+    }
+
+    /**
+     * @return Active transactions to wait for.
+     */
+    @Nullable public GridLongList activeTransactions() {
+        return activeTxs;
+    }
+
+    /**
+     * @return Rows which are safe to cleanup.
+     */
+    public List<CacheSearchRow> cleanupRows() {
+        return cleanupRows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+        BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx)
+        throws IgniteCheckedException
+    {
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        // All previous version should be less then new one.
+        assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+        assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+
+        boolean checkActive = mvccVer.activeTransactions().size() > 0;
+
+        boolean txActive = false;
+
+        // Suppose transactions on previous coordinator versions are done.
+        if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+            long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+            if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
+                txActive = true;
+
+                if (activeTxs == null)
+                    activeTxs = new GridLongList();
+
+                activeTxs.add(rowMvccCntr);
+            }
+        }
+
+        if (hasPrev == null)
+            hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
+
+        if (!txActive) {
+            assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+
+            int cmp;
+
+            if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+                cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+            else
+                cmp = 1;
+
+            if (cmp >= 0) {
+                // Do not cleanup oldest version.
+                if (canCleanup) {
+                    CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
+
+                    assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+
+                    // Should not be possible to cleanup active tx.
+                    assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+                        || !mvccVer.activeTransactions().contains(row.mvccCounter());
+
+                    if (cleanupRows == null)
+                        cleanupRows = new ArrayList<>();
+
+                    cleanupRows.add(row);
+                }
+                else
+                    canCleanup = true;
+            }
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return mvccVer.coordinatorVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return mvccVer.counter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccUpdateRow.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index 6af2c4c..c829afb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -11,7 +11,7 @@
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+  See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
@@ -21,18 +21,23 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow, CacheDataRow> {
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
     /** */
     private final MvccCoordinatorVersion ver;
 
+    /** */
+    private CacheDataRow resRow;
+
     /**
      * @param cacheId Cache ID.
      * @param key Key.
@@ -46,21 +51,36 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Ro
         this.ver = ver;
     }
 
+    /**
+     * @return Found row.
+     */
+    @Nullable public CacheDataRow row() {
+        return resRow;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
         BPlusIO<CacheSearchRow> io,
         long pageAddr,
         int idx) throws IgniteCheckedException
     {
-        if (ver.activeTransactions() == null)
-            return true;
+        boolean visible = true;
+
+        if (ver.activeTransactions().size() > 0) {
+            RowLinkIO rowIo = (RowLinkIO)io;
+
+            // TODO IGNITE-3478 sort active transactions?
+            if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+                visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
+        }
 
-        RowLinkIO rowIo = (RowLinkIO)io;
+        if (visible) {
+            resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
 
-        if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) != ver.coordinatorVersion())
-            return true;
+            return false; // Stop search.
+        }
 
-        return !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); // TODO IGNITE-3478 sort active transactions?
+        return true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 89b3df2..115e8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -2539,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param restartCrd If {@code true} dedicated coordinator node is restarted during test.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -2685,6 +2686,45 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             stop.set(true);
         }
     }
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testSize() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+        assertEquals(cache.size(), 0);
+
+        final int KEYS = 10;
+
+        for (int i = 0; i < KEYS; i++) {
+            final Integer key = i;
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(key, i);
+
+                tx.commit();
+            }
+
+            assertEquals(i + 1, cache.size());
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            final Integer key = i;
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.put(key, i);
+
+                tx.commit();
+            }
+
+            assertEquals(KEYS, cache.size());
+        }
+
+        // TODO IGNITE-3478: test removes.
+    }
+
 
     /**
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e25b649/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index e7ab34f..e2f6b2e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -571,7 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertEquals(x, tree.findOne(x).longValue());
-            assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+            checkIterate(tree, x, x, x, true);
 
             assertNoLocks();
 
@@ -588,13 +588,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         for (long x = 0; x < cnt; x++) {
             assertEquals(x, tree.findOne(x).longValue());
-            assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+            checkIterate(tree, x, x, x, true);
         }
 
         assertNoLocks();
 
         assertNull(tree.findOne(cnt));
-        assertNull(tree.findOneBounded(cnt, cnt, null, null));
+        checkIterate(tree, cnt, cnt, null, false);
 
         for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) {
             X.println(" -- " + x);
@@ -608,7 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertNull(tree.findOne(x));
-            assertNull(tree.findOneBounded(x, x, null, null));
+            checkIterate(tree, x, x, null, false);
 
             assertNoLocks();
 
@@ -625,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param tree
+     * @param lower
+     * @param upper
+     * @param exp
+     * @param expFound
+     * @throws IgniteCheckedException
+     */
+    private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
+        throws IgniteCheckedException {
+        TestTreeRowClosure c = new TestTreeRowClosure(exp);
+
+        tree.iterate(lower, upper, c);
+
+        assertEquals(expFound, c.found);
+    }
+
+    private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
+        throws IgniteCheckedException {
+        c.found = false;
+
+        tree.iterate(lower, upper, c);
+
+        assertEquals(expFound, c.found);
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
@@ -1250,44 +1276,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testFindOneBounded() throws Exception {
+    public void testIterate() throws Exception {
         MAX_PER_PAGE = 5;
 
         TestTree tree = createTestTree(true);
 
-        assertNull(tree.findOneBounded(0L, 100L, null, null));
+        checkIterate(tree, 0L, 100L, null, false);
 
         for (long idx = 1L; idx <= 10L; ++idx)
             tree.put(idx);
 
         for (long idx = 1L; idx <= 10L; ++idx)
-            assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null));
+            checkIterate(tree, idx, 100L, idx, true);
 
-        assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null));
+        checkIterate(tree, 0L, 100L, 1L, true);
 
         for (long idx = 1L; idx <= 10L; ++idx)
-            assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L), null));
+            checkIterate(tree, idx, 100L, 10L, true);
 
-        assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null));
+        checkIterate(tree, 0L, 100L, 100L, false);
 
         for (long idx = 1L; idx <= 10L; ++idx)
-            assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx), null));
+            checkIterate(tree, 0L, 100L, idx, true);
 
         for (long idx = 0L; idx <= 10L; ++idx)
-            assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null));
+            checkIterate(tree, idx, 11L, -1L, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testFindOneBoundedConcurrentPutRemove() throws Exception {
+    public void testIterateConcurrentPutRemove() throws Exception {
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIterateConcurrentPutRemove_1() throws Exception {
+        MAX_PER_PAGE = 1;
+
         findOneBoundedConcurrentPutRemove();
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testFindOneBoundedConcurrentPutRemove_5() throws Exception {
+    public void testIterateConcurrentPutRemove_5() throws Exception {
         MAX_PER_PAGE = 5;
 
         findOneBoundedConcurrentPutRemove();
@@ -1296,7 +1331,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testFindOneBoundedConcurrentPutRemove_10() throws Exception {
+    public void testIteratePutRemove_10() throws Exception {
         MAX_PER_PAGE = 10;
 
         findOneBoundedConcurrentPutRemove();
@@ -1370,33 +1405,30 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             info("Iteration [iter=" + i + ", key=" + findKey + ']');
 
             assertEquals(findKey, tree.findOne(findKey));
-            assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+            checkIterate(tree, findKey, findKey, findKey, true);
 
             IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
                 @Override public Void call() throws Exception {
                     ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                    TestRowPredicate p = new TestRowPredicate(findKey);
+                    TestTreeRowClosure p = new TestTreeRowClosure(findKey);
 
-                    TestRowPredicate falseP = new TestRowPredicate(-1L);
+                    TestTreeRowClosure falseP = new TestTreeRowClosure(-1L);
 
                     int cnt = 0;
 
                     while (!stop.get()) {
                         int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100);
 
-                        assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+                        checkIterateC(tree, findKey, findKey, p, true);
 
-                        assertEquals(findKey,
-                            tree.findOneBounded(findKey - shift, findKey, p, null));
+                        checkIterateC(tree, findKey - shift, findKey, p, true);
 
-                        assertEquals(findKey,
-                            tree.findOneBounded(findKey - shift, findKey + shift, p, null));
+                        checkIterateC(tree, findKey - shift, findKey + shift, p, true);
 
-                        assertEquals(findKey,
-                            tree.findOneBounded(findKey, findKey + shift, p, null));
+                        checkIterateC(tree, findKey, findKey + shift, p, true);
 
-                        assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null));
+                        checkIterateC(tree, -100L, KEYS + 100L, falseP, false);
 
                         cnt++;
                     }
@@ -1650,7 +1682,11 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
                         last = c.get();
                     }
 
-                    last = tree.findOneBounded((long)low, (long)high, null, null);
+                    TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure();
+
+                    tree.iterate((long)low, (long)high, cl);
+
+                    last = cl.val;
 
                     if (last != null) {
                         assertTrue(low + " <= " + last + " <= " + high, last >= low);
@@ -2064,23 +2100,46 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> {
+    static class TestTreeRowClosure implements BPlusTree.TreeRowClosure<Long, Long> {
         /** */
         private final Long expVal;
 
+        /** */
+        private boolean found;
+
         /**
-         * @param expVal Expected value.
+         * @param expVal Value to find or {@code null} to find first.
          */
-        TestRowPredicate(Long expVal) {
+        TestTreeRowClosure(Long expVal) {
             this.expVal = expVal;
         }
 
         /** {@inheritDoc} */
         @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
             throws IgniteCheckedException {
-            Long row = io.getLookupRow(tree, pageAddr, idx);
+            assert !found;
+
+            found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal);
+
+            return !found;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+        /** */
+        private Long val;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+            throws IgniteCheckedException {
+            assert val == null;
+
+            val = io.getLookupRow(tree, pageAddr, idx);
 
-            return row.equals(expVal);
+            return false;
         }
     }
 }


Mime
View raw message