ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5937
Date Tue, 03 Oct 2017 15:01:00 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 [created] 44ad70112


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44ad7011
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44ad7011
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44ad7011

Branch: refs/heads/ignite-5937
Commit: 44ad70112fb1063b61fcdc20c5fee893381d2e44
Parents: 27b2be4
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Oct 3 16:50:01 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Oct 3 18:00:46 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  83 ++--
 .../cache/persistence/tree/BPlusTree.java       | 376 ++++++++++++++-----
 .../cache/tree/MvccVersionBasedSearchRow.java   |  80 ++++
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  79 ++++
 .../processors/database/BPlusTreeSelfTest.java  | 232 +++++++++++-
 5 files changed, 718 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index d8c5eaa..76d9649 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.cache.tree.SearchRow;
@@ -1647,14 +1648,22 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             CacheDataRow row;
 
             if (grp.mvccEnabled()) {
-                // TODO IGNITE-3484: need special method.
-                GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId,
key, Long.MAX_VALUE, Long.MAX_VALUE),
-                    new MvccSearchRow(cacheId, key, 1, 1));
+                if (false) {
+                    row = dataTree.findOneBounded(
+                        new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+                        new MvccSearchRow(cacheId, key, 1L, 1L),
+                        null,
+                        CacheDataRowAdapter.RowData.NO_KEY);
+                }
+                else {
+                    GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId,
key, Long.MAX_VALUE, Long.MAX_VALUE),
+                        new MvccSearchRow(cacheId, key, 1, 1));
 
-                if (cur.next())
-                    row = cur.get();
-                else
-                    row = null;
+                    if (cur.next())
+                        row = cur.get();
+                    else
+                        row = null;
+                }
             }
             else
                 row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1705,41 +1714,53 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            // TODO IGNITE-3484: need special method.
-            GridCursor<CacheDataRow> cur = dataTree.find(
-                new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
-                new MvccSearchRow(cacheId, key, 1, 1));
+            if (false) {
+                MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId,
key, ver);
 
-            CacheDataRow row = null;
+                CacheDataRow row = dataTree.findOneBounded(
+                    lower,
+                    new MvccSearchRow(cacheId, key, 1L, 1L),
+                    lower, // Use the same instance as predicate to do not create extra object.
+                    CacheDataRowAdapter.RowData.NO_KEY);
 
-            MvccLongList txs = ver.activeTransactions();
+                afterRowFound(row, key);
 
-            while (cur.next()) {
-                CacheDataRow row0 = cur.get();
+                return row;
+            }
+            else {
+                GridCursor<CacheDataRow> cur = dataTree.find(
+                    new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
+                    new MvccSearchRow(cacheId, key, 1, 1));
 
-                assert row0.mvccCoordinatorVersion() > 0 : row0;
+                CacheDataRow row = null;
 
-                boolean visible;
+                MvccLongList txs = ver.activeTransactions();
 
-                if (txs != null) {
-                    visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
-                        || !txs.contains(row0.mvccCounter());
-                }
-                else
-                    visible = true;
+                while (cur.next()) {
+                    CacheDataRow row0 = cur.get();
 
-                if (visible) {
-                    row = row0;
+                    assert row0.mvccCoordinatorVersion() > 0 : row0;
 
-                    break;
-                }
-            }
+                    boolean visible;
 
-            assert row == null || key.equals(row.key());
+                    if (txs != null) {
+                        visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
+                            || !txs.contains(row0.mvccCounter());
+                    }
+                    else
+                        visible = true;
 
-            //afterRowFound(row, key);
+                    if (visible) {
+                        row = row0;
 
-            return row;
+                        break;
+                    }
+                }
+
+                assert row == null || key.equals(row.key());
+
+                return row;
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index c73b4c7..d570f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -2509,14 +2509,14 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
      */
     private final class GetCursor extends Get {
         /** */
-        ForwardCursor cursor;
+        AbstractForwardCursor cursor;
 
         /**
          * @param lower Lower bound.
          * @param shift Shift.
          * @param cursor Cursor.
          */
-        GetCursor(L lower, int shift, ForwardCursor cursor) {
+        GetCursor(L lower, int shift, AbstractForwardCursor cursor) {
             super(lower, false);
 
             assert shift != 0; // Either handle range of equal rows or find a greater row
after concurrent merge.
@@ -4384,52 +4384,85 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
      */
     protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws
IgniteCheckedException;
 
+    public interface RowPredicate<L, T extends L> {
+        public boolean apply(BPlusTree<L, T> tree, BPlusIO<L> io, long pageAddr,
int idx) throws IgniteCheckedException;
+    }
+
+    public T findOneBounded(L lower, L upper, RowPredicate<L, T> p, Object x) throws
IgniteCheckedException {
+        checkDestroyed();
+
+        try {
+            FindOneCursor cursor = new FindOneCursor(lower, upper, p, x);
+
+            return cursor.findOne();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower
+ ", upper=" + upper + "]", e);
+        }
+        catch (RuntimeException e) {
+            throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper="
+ upper + "]", e);
+        }
+        catch (AssertionError e) {
+            throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper="
+ upper + "]", e);
+        }
+        finally {
+            checkDestroyed();
+        }
+    }
+
     /**
-     * Forward cursor.
+     *
      */
     @SuppressWarnings("unchecked")
-    private final class ForwardCursor implements GridCursor<T> {
+    private abstract class AbstractForwardCursor {
         /** */
-        private T[] rows = (T[])EMPTY;
-
-        /** */
-        private int row = -1;
+        long nextPageId;
 
         /** */
-        private long nextPageId;
-
-        /** */
-        private L lowerBound;
+        L lowerBound;
 
         /** */
         private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows.
 
         /** */
-        private final L upperBound;
+        final L upperBound;
 
         /** */
-        private final Object x;
+        final Object x;
 
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
          */
-        ForwardCursor(L lowerBound, L upperBound) {
+        AbstractForwardCursor(L lowerBound, L upperBound, Object x) {
             this.lowerBound = lowerBound;
             this.upperBound = upperBound;
-            this.x = null;
+            this.x = x;
         }
 
         /**
-         * @param lowerBound Lower bound.
-         * @param upperBound Upper bound.
-         * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+         *
          */
-        ForwardCursor(L lowerBound, L upperBound, Object x) {
-            this.lowerBound = lowerBound;
-            this.upperBound = upperBound;
-            this.x = x;
-        }
+        abstract void init0();
+
+        /**
+         * @param pageAddr
+         * @param io
+         * @param startIdx
+         * @param cnt
+         * @return
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx,
int cnt) throws IgniteCheckedException;
+
+        /**
+         * @return
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract boolean reinitialize0() throws IgniteCheckedException;
+
+        abstract void onNotFound(boolean readDone);
 
         /**
          * @param pageAddr Page address.
@@ -4437,9 +4470,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
          * @param startIdx Start index.
          * @throws IgniteCheckedException If failed.
          */
-        private void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException
{
+        final void init(long pageAddr, BPlusIO<L> io, int startIdx) throws IgniteCheckedException
{
             nextPageId = 0;
-            row = -1;
+
+            init0();
 
             int cnt = io.getCount(pageAddr);
 
@@ -4447,16 +4481,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
             if (cnt == 0) {
                 assert io.getForward(pageAddr) == 0L;
 
-                rows = null;
-            }
-            else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) {
-                if (rows != EMPTY) {
-                    assert rows.length > 0; // Otherwise it makes no sense to create an
array.
-
-                    // Fake clear.
-                    rows[0] = null;
-                }
+                onNotFound(true);
             }
+            else if (!fillFromBuffer(pageAddr, io, startIdx, cnt))
+                onNotFound(false);
         }
 
         /**
@@ -4466,7 +4494,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
          * @return Adjusted to lower bound start index.
          * @throws IgniteCheckedException If failed.
          */
-        private int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException
{
+        final int findLowerBound(long pageAddr, BPlusIO<L> io, int cnt) throws IgniteCheckedException
{
             assert io.isLeaf();
 
             // Compare with the first row on the page.
@@ -4491,7 +4519,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
          * @return Corrected number of rows with respect to upper bound.
          * @throws IgniteCheckedException If failed.
          */
-        private int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt)
throws IgniteCheckedException {
+        final int findUpperBound(long pageAddr, BPlusIO<L> io, int low, int cnt) throws
IgniteCheckedException {
             assert io.isLeaf();
 
             // Compare with the last row on the page.
@@ -4530,68 +4558,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
 
             nextPageId = io.getForward(pageAddr);
 
-            if (lowerBound != null && startIdx == 0)
-                startIdx = findLowerBound(pageAddr, io, cnt);
-
-            if (upperBound != null && cnt != startIdx)
-                cnt = findUpperBound(pageAddr, io, startIdx, cnt);
-
-            cnt -= startIdx;
-
-            if (cnt == 0)
-                return false;
-
-            if (rows == EMPTY)
-                rows = (T[])new Object[cnt];
-
-            for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i, x);
-
-                rows = GridArrays.set(rows, i, r);
-            }
-
-            GridArrays.clearTail(rows, cnt);
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("SimplifiableIfStatement")
-        @Override public boolean next() throws IgniteCheckedException {
-            if (rows == null)
-                return false;
-
-            if (++row < rows.length && rows[row] != null) {
-                clearLastRow(); // Allow to GC the last returned row.
-
-                return true;
-            }
-
-            return nextPage();
-        }
-
-        /**
-         * @return Cleared last row.
-         */
-        private T clearLastRow() {
-            if (row == 0)
-                return null;
-
-            int last = row - 1;
-
-            T r = rows[last];
-
-            assert r != null;
-
-            rows[last] = null;
-
-            return r;
+            return fillFromBuffer0(pageAddr, io, startIdx, cnt);
         }
 
         /**
          * @throws IgniteCheckedException If failed.
          */
-        private void find() throws IgniteCheckedException {
+        final void find() throws IgniteCheckedException {
             assert lowerBound != null;
 
             doFind(new GetCursor(lowerBound, lowerShift, this));
@@ -4607,21 +4580,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
             // to the previous lower bound.
             find();
 
-            return next();
+            return reinitialize0();
         }
 
         /**
          * @return {@code true} If we have rows to return after reading the next page.
          * @throws IgniteCheckedException If failed.
          */
-        private boolean nextPage() throws IgniteCheckedException {
-            updateLowerBound(clearLastRow());
-
-            row = 0;
+        final boolean nextPage(T lastRow) throws IgniteCheckedException {
+            updateLowerBound(lastRow);
 
             for (;;) {
                 if (nextPageId == 0) {
-                    rows = null;
+                    onNotFound(true);
 
                     return false; // Done.
                 }
@@ -4665,6 +4636,211 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure
implements
                 lowerBound = lower; // Move the lower bound forward for further concurrent
merge retries.
             }
         }
+    }
+
+    /**
+     * Forward cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private final class FindOneCursor extends AbstractForwardCursor {
+        /** */
+        private Object resRow;
+
+        /** */
+        private T lastRow;
+
+        /** */
+        private final RowPredicate<L, T> p;
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+         */
+        FindOneCursor(L lowerBound, L upperBound, RowPredicate<L, T> p, Object x) {
+            super(lowerBound, upperBound, x);
+
+            assert lowerBound != null;
+            assert upperBound != null;
+
+            this.p = p;
+        }
+
+        @Override void init0() {
+            // No-op.
+        }
+
+        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx,
int cnt) throws IgniteCheckedException {
+            if (startIdx == 0) // TODO IGNITE-3478: startIdx == 0? can search twice for first
item?
+                startIdx = findLowerBound(pageAddr, io, cnt);
+
+            if (cnt == startIdx)
+                return false;
+
+            for (int i = startIdx; i < cnt; i++) {
+                int cmp = compare(0, io, pageAddr, i, upperBound);
+
+                if (cmp > 0) {
+                    nextPageId = 0; // The End.
+
+                    return false;
+                }
+
+                if (p == null || p.apply(BPlusTree.this, io, pageAddr, i)) {
+                    resRow = getRow(io, pageAddr, i, x);
+
+                    return true;
+                }
+            }
+
+            if (nextPageId != 0)
+                lastRow = getRow(io, pageAddr, cnt - 1, x); // Need save last row.
+
+            return true;
+        }
+
+        @Override boolean reinitialize0() throws IgniteCheckedException {
+            return true;
+        }
+
+        @Override void onNotFound(boolean readDone) {
+            resRow = EMPTY;
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         * @return Found row.
+         */
+        private T findOne() throws IgniteCheckedException {
+            find();
+
+            if (resRow != null) {
+                if (resRow == EMPTY)
+                    return null;
+
+                return (T)resRow;
+            }
+
+            for (;;) {
+                T lastRow0 = lastRow;
+
+                lastRow = null;
+
+                nextPage(lastRow0);
+
+                if (resRow != null) {
+                    if (resRow == EMPTY)
+                        return null;
+
+                    return (T)resRow;
+                }
+            }
+        }
+    }
+
+    /**
+     * Forward cursor.
+     */
+    @SuppressWarnings("unchecked")
+    private final class ForwardCursor extends AbstractForwardCursor implements GridCursor<T>
{
+        /** */
+        private T[] rows = (T[])EMPTY;
+
+        /** */
+        private int row = -1;
+
+        /**
+         * @param lowerBound Lower bound.
+         * @param upperBound Upper bound.
+         * @param x Implementation specific argument, {@code null} always means that we need
to return full detached data row.
+         */
+        ForwardCursor(L lowerBound, L upperBound, Object x) {
+            super(lowerBound, upperBound, x);
+        }
+
+        @Override boolean fillFromBuffer0(long pageAddr, BPlusIO<L> io, int startIdx,
int cnt) throws IgniteCheckedException {
+            if (lowerBound != null && startIdx == 0)
+                startIdx = findLowerBound(pageAddr, io, cnt);
+
+            if (upperBound != null && cnt != startIdx)
+                cnt = findUpperBound(pageAddr, io, startIdx, cnt);
+
+            cnt -= startIdx;
+
+            if (cnt == 0)
+                return false;
+
+            if (rows == EMPTY)
+                rows = (T[])new Object[cnt];
+
+            for (int i = 0; i < cnt; i++) {
+                T r = getRow(io, pageAddr, startIdx + i, x);
+
+                rows = GridArrays.set(rows, i, r);
+            }
+
+            GridArrays.clearTail(rows, cnt);
+
+            return true;
+        }
+
+        @Override boolean reinitialize0() throws IgniteCheckedException {
+            return next();
+        }
+
+        @Override void onNotFound(boolean readDone) {
+            if (readDone)
+                rows = null;
+            else {
+                if (rows != EMPTY) {
+                    assert rows.length > 0; // Otherwise it makes no sense to create an
array.
+
+                    // Fake clear.
+                    rows[0] = null;
+                }
+            }
+        }
+
+        @Override void init0() {
+            row = -1;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean next() throws IgniteCheckedException {
+            if (rows == null)
+                return false;
+
+            if (++row < rows.length && rows[row] != null) {
+                clearLastRow(); // Allow to GC the last returned row.
+
+                return true;
+            }
+
+            T lastRow = clearLastRow();
+
+            row = 0;
+
+            return nextPage(lastRow);
+        }
+
+        /**
+         * @return Cleared last row.
+         */
+        private T clearLastRow() {
+            if (row == 0)
+                return null;
+
+            int last = row - 1;
+
+            T r = rows[last];
+
+            assert r != null;
+
+            rows[last] = null;
+
+            return r;
+        }
 
         /** {@inheritDoc} */
         @Override public T get() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
new file mode 100644
index 0000000..f708ffd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.RowPredicate<CacheSearchRow,
CacheDataRow> {
+    /** */
+    private final MvccCoordinatorVersion ver;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param ver Mvcc version.
+     */
+    public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion
ver) {
+        super(cacheId, key);
+
+        assert ver != null;
+
+        this.ver = ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
+        BPlusIO<CacheSearchRow> io,
+        long pageAddr,
+        int idx) throws IgniteCheckedException
+    {
+        if (ver.activeTransactions() == null)
+            return true;
+
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        if (rowIo.getMvccUpdateTopologyVersion(pageAddr, idx) != ver.coordinatorVersion())
+            return true;
+
+        return !ver.activeTransactions().contains(ver.counter()); // TODO IGNITE-3478 sort
active transactions?
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return ver.coordinatorVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return ver.counter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccVersionBasedSearchRow.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index f28fe2d..7936340 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
@@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -2682,6 +2687,80 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testInternalApi() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0,
1));
+
+        GridCacheContext cctx =
+            ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName()));
+
+        CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators();
+
+        // Start query to prevent cleanup.
+        IgniteInternalFuture<MvccCoordinatorVersion> fut = crd.requestQueryCounter(crd.currentCoordinator());
+
+        fut.get();
+
+        final Integer key = 0;
+
+        for (int i = 0; i < 10; i++) {
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                cache.put(key + 1, i);
+
+                tx.commit();
+            }
+        }
+
+        for (int i = 0; i < 10; i++) {
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                cache.put(key, i);
+
+                tx.commit();
+            }
+        }
+
+        KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+        List<T2<Object, MvccCounter>> vers = cctx.offheap().mvccAllVersions(cctx,
key0);
+
+        assertEquals(10, vers.size());
+
+        CacheDataRow row = cctx.offheap().read(cctx, key0);
+
+        checkRow(cctx, row, key0, vers.get(0).get1());
+
+        for (T2<Object, MvccCounter> ver : vers) {
+            MvccCounter cntr = ver.get2();
+
+            MvccCoordinatorVersion readVer =
+                new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(),
0);
+
+            row = cctx.offheap().mvccRead(cctx, key0, readVer);
+
+            checkRow(cctx, row, key0, ver.get1());
+        }
+
+        checkRow(cctx,
+            cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion()
+ 1, 1, 0)),
+            key0,
+            vers.get(0).get1());
+
+        checkRow(cctx,
+            cctx.offheap().mvccRead(cctx, key0, new MvccCoordinatorVersionResponse(vers.get(0).get2().coordinatorVersion(),
vers.get(0).get2().counter() + 1, 0)),
+            key0,
+            vers.get(0).get1());
+    }
+
+    private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey,
Object expVal) {
+        assertNotNull(row);
+        assertEquals(expKey, row.key());
+        assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false));
+    }
+
+    /**
      * @return Cache configurations.
      */
     private List<CacheConfiguration<Object, Object>> cacheConfigurations() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/44ad7011/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 9c0d791..e7ab34f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertEquals(x, tree.findOne(x).longValue());
+            assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
 
             assertNoLocks();
 
@@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         assertNull(tree.findOne(-1L));
 
-        for (long x = 0; x < cnt; x++)
+        for (long x = 0; x < cnt; x++) {
             assertEquals(x, tree.findOne(x).longValue());
+            assertEquals(x, tree.findOneBounded(x, x, null, null).longValue());
+        }
 
         assertNoLocks();
 
         assertNull(tree.findOne(cnt));
+        assertNull(tree.findOneBounded(cnt, cnt, null, null));
 
         for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x +=
RMV_INC) {
             X.println(" -- " + x);
@@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             assertNoLocks();
 
             assertNull(tree.findOne(x));
+            assertNull(tree.findOneBounded(x, x, null, null));
 
             assertNoLocks();
 
@@ -1242,6 +1248,200 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testFindOneBounded() throws Exception {
+        MAX_PER_PAGE = 5;
+
+        TestTree tree = createTestTree(true);
+
+        assertNull(tree.findOneBounded(0L, 100L, null, null));
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            tree.put(idx);
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            assertEquals(idx, (Object)tree.findOneBounded(idx, 100L, null, null));
+
+        assertEquals(1L, (Object)tree.findOneBounded(0L, 100L, null, null));
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            assertEquals(10L, (Object)tree.findOneBounded(idx, 100L, new TestRowPredicate(10L),
null));
+
+        assertNull(tree.findOneBounded(0L, 100L, new TestRowPredicate(100L), null));
+
+        for (long idx = 1L; idx <= 10L; ++idx)
+            assertEquals(idx, (Object)tree.findOneBounded(0L, 100L, new TestRowPredicate(idx),
null));
+
+        for (long idx = 0L; idx <= 10L; ++idx)
+            assertNull(tree.findOneBounded(idx, 11L, new TestRowPredicate(-1L), null));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFindOneBoundedConcurrentPutRemove() throws Exception {
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFindOneBoundedConcurrentPutRemove_5() throws Exception {
+        MAX_PER_PAGE = 5;
+
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFindOneBoundedConcurrentPutRemove_10() throws Exception {
+        MAX_PER_PAGE = 10;
+
+        findOneBoundedConcurrentPutRemove();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void findOneBoundedConcurrentPutRemove() throws Exception {
+        final TestTree tree = createTestTree(true);
+
+        final int KEYS = 10_000;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 10; i++) {
+            for (long idx = 0L; idx < KEYS; ++idx)
+                tree.put(idx);
+
+            final Long findKey;
+
+            if (MAX_PER_PAGE > 0) {
+                switch (i) {
+                    case 0:
+                        findKey = 1L;
+
+                        break;
+
+                    case 1:
+                        findKey = (long)MAX_PER_PAGE;
+
+                        break;
+
+                    case 2:
+                        findKey = (long)MAX_PER_PAGE - 1;
+
+                        break;
+
+                    case 3:
+                        findKey = (long)MAX_PER_PAGE + 1;
+
+                        break;
+
+                    case 4:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE;
+
+                        break;
+
+                    case 5:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1;
+
+                        break;
+
+                    case 6:
+                        findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1;
+
+                        break;
+
+                    case 7:
+                        findKey = (long)KEYS - 1;
+
+                        break;
+
+                    default:
+                        findKey = rnd.nextLong(KEYS);
+                }
+            }
+            else
+                findKey = rnd.nextLong(KEYS);
+
+            info("Iteration [iter=" + i + ", key=" + findKey + ']');
+
+            assertEquals(findKey, tree.findOne(findKey));
+            assertEquals(findKey, tree.findOneBounded(findKey, findKey, null, null));
+
+            IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    TestRowPredicate p = new TestRowPredicate(findKey);
+
+                    TestRowPredicate falseP = new TestRowPredicate(-1L);
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) :
rnd.nextInt(100);
+
+                        assertEquals(findKey, tree.findOneBounded(findKey, findKey, null,
null));
+
+                        assertEquals(findKey,
+                            tree.findOneBounded(findKey - shift, findKey, p, null));
+
+                        assertEquals(findKey,
+                            tree.findOneBounded(findKey - shift, findKey + shift, p, null));
+
+                        assertEquals(findKey,
+                            tree.findOneBounded(findKey, findKey + shift, p, null));
+
+                        assertNull(tree.findOneBounded(-100L, KEYS + 100L, falseP, null));
+
+                        cnt++;
+                    }
+
+                    info("Done, read count: " + cnt);
+
+                    return null;
+                }
+            }, 10, "find");
+
+            asyncRunFut = new GridCompoundFuture<>();
+
+            asyncRunFut.add(getFut);
+
+            asyncRunFut.markInitialized();
+
+            try {
+                U.sleep(100);
+
+                for (int j = 0; j < 20; j++) {
+                    for (long idx = 0L; idx < KEYS / 2; ++idx) {
+                        long toRmv = rnd.nextLong(KEYS);
+
+                        if (toRmv != findKey)
+                            tree.remove(toRmv);
+                    }
+
+                    for (long idx = 0L; idx < KEYS / 2; ++idx) {
+                        long put = rnd.nextLong(KEYS);
+
+                        tree.put(put);
+                    }
+                }
+            }
+            finally {
+                stop.set(true);
+            }
+
+            asyncRunFut.get();
+
+            stop.set(false);
+        }
+    }
+
+    /**
      *
      */
     public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
@@ -1449,6 +1649,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
                         last = c.get();
                     }
+
+                    last = tree.findOneBounded((long)low, (long)high, null, null);
+
+                    if (last != null) {
+                        assertTrue(low + " <= " + last + " <= " + high, last >=
low);
+                        assertTrue(low + " <= " + last + " <= " + high, last <=
high);
+                    }
                 }
 
                 return null;
@@ -1853,4 +2060,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             return PageUtils.getLong(pageAddr, offset(idx));
         }
     }
+
+    /**
+     *
+     */
+    static class TestRowPredicate implements TestTree.RowPredicate<Long, Long> {
+        /** */
+        private final Long expVal;
+
+        /**
+         * @param expVal Expected value.
+         */
+        TestRowPredicate(Long expVal) {
+            this.expVal = expVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long>
io, long pageAddr, int idx)
+            throws IgniteCheckedException {
+            Long row = io.getLookupRow(tree, pageAddr, idx);
+
+            return row.equals(expVal);
+        }
+    }
 }


Mime
View raw message