Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9DC9D200D1A for ; Mon, 9 Oct 2017 13:42:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9C06F1609E0; Mon, 9 Oct 2017 11:42:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E09761609BB for ; Mon, 9 Oct 2017 13:42:57 +0200 (CEST) Received: (qmail 25753 invoked by uid 500); 9 Oct 2017 11:42:57 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 25744 invoked by uid 99); 9 Oct 2017 11:42:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 11:42:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 19ABCF56D1; Mon, 9 Oct 2017 11:42:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <11a650ebaace4be289b9a9c7fff0b017@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-5937 Added BPlusTree.iterate for more optimal mvcc search Date: Mon, 9 Oct 2017 11:42:55 +0000 (UTC) archived-at: Mon, 09 Oct 2017 11:42:59 -0000 Repository: ignite Updated Branches: refs/heads/ignite-3478 921404a6f -> fd53c1a8f ignite-5937 Added BPlusTree.iterate for more optimal mvcc search Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd53c1a8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd53c1a8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd53c1a8 Branch: refs/heads/ignite-3478 Commit: fd53c1a8f4e905a7aba469eb5decf38c50b7708e Parents: 921404a Author: sboikov Authored: Mon Oct 9 14:42:43 2017 +0300 Committer: sboikov Committed: Mon Oct 9 14:42:43 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManager.java | 11 +- .../cache/IgniteCacheOffheapManagerImpl.java | 244 +++++++---- .../cache/mvcc/CacheCoordinatorsProcessor.java | 12 +- .../cache/persistence/tree/BPlusTree.java | 416 ++++++++++++++----- .../cache/tree/AbstractDataInnerIO.java | 8 +- .../cache/tree/AbstractDataLeafIO.java | 8 +- .../processors/cache/tree/CacheDataTree.java | 10 +- .../cache/tree/CacheIdAwareDataInnerIO.java | 4 +- .../cache/tree/CacheIdAwareDataLeafIO.java | 4 +- .../processors/cache/tree/DataInnerIO.java | 4 +- .../processors/cache/tree/DataLeafIO.java | 4 +- .../processors/cache/tree/MvccDataInnerIO.java | 4 +- .../processors/cache/tree/MvccDataLeafIO.java | 4 +- .../cache/tree/MvccKeyMaxVersionBound.java | 77 ++++ .../cache/tree/MvccKeyMinVersionBound.java | 49 +++ .../processors/cache/tree/MvccUpdateRow.java | 177 ++++++++ .../cache/tree/MvccVersionBasedSearchRow.java | 100 +++++ .../processors/cache/tree/RowLinkIO.java | 14 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 149 +++++++ .../processors/database/BPlusTreeSelfTest.java | 291 ++++++++++++- 20 files changed, 1376 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 9d03e4a..8967ce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -492,7 +492,16 @@ public interface IgniteCacheOffheapManager { long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException; - GridLongList mvccUpdate( + /** + * @param cctx Cache context. + * @param key Key. + * @param val Value. + * @param ver Version. + * @param mvccVer Mvcc version. + * @return List of transactions to wait for. + * @throws IgniteCheckedException If failed. + */ + @Nullable GridLongList mvccUpdate( GridCacheContext cctx, KeyCacheObject key, CacheObject val, http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index d8c5eaa..25f36b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -55,7 +55,11 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.DataRow; import org.apache.ignite.internal.processors.cache.tree.MvccDataRow; +import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound; +import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound; import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow; +import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow; +import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.processors.cache.tree.SearchRow; @@ -1360,83 +1364,141 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - MvccDataRow dataRow = new MvccDataRow( - key, - val, - ver, - partId, - cacheId, - mvccVer.coordinatorVersion(), - mvccVer.counter()); - CacheObjectContext coCtx = cctx.cacheObjectContext(); // Make sure value bytes initialized. key.valueBytes(coCtx); val.valueBytes(coCtx); - rowStore.addRow(dataRow); + if (true) { + MvccUpdateRow updateRow = new MvccUpdateRow( + key, + val, + ver, + mvccVer, + partId, + cacheId); - assert dataRow.link() != 0 : dataRow; + rowStore.addRow(updateRow); - if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) - dataRow.cacheId(cctx.cacheId()); + assert updateRow.link() != 0 : updateRow; - boolean old = dataTree.putx(dataRow); + if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID) + updateRow.cacheId(cctx.cacheId()); - assert !old; + GridLongList waitTxs = null; - GridLongList waitTxs = null; + if (mvccVer.initialLoad()) { + boolean old = dataTree.putx(updateRow); - if (!mvccVer.initialLoad()) { - MvccLongList activeTxs = mvccVer.activeTransactions(); + assert !old; - // TODO IGNITE-3484: need special method. - GridCursor cur = dataTree.find( - new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), - new MvccSearchRow(cacheId, key, 1, 1)); + incrementSize(cctx.cacheId()); + } + else { + dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow); - boolean first = true; + boolean old = dataTree.putx(updateRow); - boolean activeTx = false; + assert !old; - while (cur.next()) { - CacheDataRow oldVal = cur.get(); + if (!updateRow.previousNotNull()) + incrementSize(cctx.cacheId()); - assert oldVal.link() != 0 : oldVal; + waitTxs = updateRow.activeTransactions(); - if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && - activeTxs.contains(oldVal.mvccCounter())) { - if (waitTxs == null) - waitTxs = new GridLongList(); + List cleanupRows = updateRow.cleanupRows(); - assert oldVal.mvccCounter() != mvccVer.counter(); + if (cleanupRows != null) { + for (int i = 0; i < cleanupRows.size(); i++) { + CacheSearchRow oldRow = cleanupRows.get(i); - waitTxs.add(oldVal.mvccCounter()); + assert oldRow.link() != 0L : oldRow; - activeTx = true; + boolean rmvd = dataTree.removex(oldRow); + + assert rmvd; + + rowStore.removeRow(oldRow.link()); + } } + } - if (!activeTx) { - // Should not delete oldest version which is less than cleanup version. - int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + return waitTxs; + } + else { + MvccDataRow dataRow = new MvccDataRow( + key, + val, + ver, + partId, + cacheId, + mvccVer.coordinatorVersion(), + mvccVer.counter()); - if (cmp <= 0) { - if (first) - first = false; - else { - boolean rmvd = dataTree.removex(oldVal); + rowStore.addRow(dataRow); - assert rmvd; + assert dataRow.link() != 0 : dataRow; + + if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) + dataRow.cacheId(cctx.cacheId()); + + boolean old = dataTree.putx(dataRow); + + assert !old; + + GridLongList waitTxs = null; + + if (!mvccVer.initialLoad()) { + MvccLongList activeTxs = mvccVer.activeTransactions(); + + // TODO IGNITE-3484: need special method. + GridCursor cur = dataTree.find( + new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1), + new MvccSearchRow(cacheId, key, 1, 1)); + + boolean first = true; + + boolean activeTx = false; + + while (cur.next()) { + CacheDataRow oldVal = cur.get(); + + assert oldVal.link() != 0 : oldVal; + + if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() && + activeTxs.contains(oldVal.mvccCounter())) { + if (waitTxs == null) + waitTxs = new GridLongList(); + + assert oldVal.mvccCounter() != mvccVer.counter(); + + waitTxs.add(oldVal.mvccCounter()); + + activeTx = true; + } - rowStore.removeRow(oldVal.link()); + if (!activeTx) { + // Should not delete oldest version which is less than cleanup version. + int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion()); + + if (cmp <= 0) { + if (first) + first = false; + else { + boolean rmvd = dataTree.removex(oldVal); + + assert rmvd; + + rowStore.removeRow(oldVal.link()); + } } } } } - } - return waitTxs; + return waitTxs; + } } finally { busyLock.leaveBusy(); @@ -1647,14 +1709,26 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow row; if (grp.mvccEnabled()) { - // TODO IGNITE-3484: need special method. - GridCursor cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), - new MvccSearchRow(cacheId, key, 1, 1)); + if (true) { + MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key); + + dataTree.iterate( + searchRow, + new MvccKeyMinVersionBound(cacheId, key), + searchRow // Use the same instance as closure to do not create extra object. + ); - if (cur.next()) - row = cur.get(); - else - row = null; + row = searchRow.row(); + } + else { + GridCursor cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), + new MvccSearchRow(cacheId, key, 1, 1)); + + if (cur.next()) + row = cur.get(); + else + row = null; + } } else row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); @@ -1672,6 +1746,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { assert grp.mvccEnabled(); + // Note: this method is intended for testing only. + key.valueBytes(cctx.cacheObjectContext()); int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; @@ -1705,41 +1781,55 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - // TODO IGNITE-3484: need special method. - GridCursor cur = dataTree.find( - new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()), - new MvccSearchRow(cacheId, key, 1, 1)); + if (true) { + MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver); - CacheDataRow row = null; + dataTree.iterate( + lower, + new MvccKeyMinVersionBound(cacheId, key), + lower // Use the same instance as closure to do not create extra object. + ); - MvccLongList txs = ver.activeTransactions(); + CacheDataRow row = lower.row(); - while (cur.next()) { - CacheDataRow row0 = cur.get(); + afterRowFound(row, key); - assert row0.mvccCoordinatorVersion() > 0 : row0; + return row; + } + else { + GridCursor cur = dataTree.find( + new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()), + new MvccSearchRow(cacheId, key, 1, 1)); - boolean visible; + CacheDataRow row = null; - if (txs != null) { - visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() - || !txs.contains(row0.mvccCounter()); - } - else - visible = true; + MvccLongList txs = ver.activeTransactions(); - if (visible) { - row = row0; + while (cur.next()) { + CacheDataRow row0 = cur.get(); - break; - } - } + assert row0.mvccCoordinatorVersion() > 0 : row0; - assert row == null || key.equals(row.key()); + boolean visible; - //afterRowFound(row, key); + if (txs != null) { + visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion() + || !txs.contains(row0.mvccCounter()); + } + else + visible = true; - return row; + if (visible) { + row = row0; + + break; + } + } + + assert row == null || key.equals(row.key()); + + return row; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 5080c83..b9b8ea1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -614,8 +614,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { // TODO IGNITE-3478 sorted? + change GridLongList.writeTo? MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse(); - for (Long txVer : activeTxs.keySet()) + long minActive = Long.MAX_VALUE; + + for (Long txVer : activeTxs.keySet()) { + if (txVer < minActive) + minActive = txVer; + res.addTx(txVer); + } Object old = activeTxs.put(nextCtr, txId); @@ -624,7 +630,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { long cleanupVer; if (prevCrdQueries.previousQueriesDone()) { - cleanupVer = committedCntr.get() - 1; + cleanupVer = Math.min(minActive, committedCntr.get()); + + cleanupVer--; Long qryVer = activeQueries.minimalQueryCounter(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index c73b4c7..b31a61f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -907,7 +907,7 @@ public abstract class BPlusTree extends DataStructure implements long pageAddr = readLock(firstPageId, firstPage); // We always merge pages backwards, the first page is never removed. try { - cursor.init(pageAddr, io(pageAddr), 0); + cursor.init(pageAddr, io(pageAddr), -1); } finally { readUnlock(firstPageId, firstPage, pageAddr); @@ -972,6 +972,34 @@ public abstract class BPlusTree extends DataStructure implements } } + /** + * @param lower Lower bound inclusive. + * @param upper Upper bound inclusive. + * @param c Closure applied for all found items, iteration is stopped if closure returns {@code false}. + * @throws IgniteCheckedException If failed. + */ + public void iterate(L lower, L upper, TreeRowClosure c) throws IgniteCheckedException { + checkDestroyed(); + + try { + ClosureCursor cursor = new ClosureCursor(lower, upper, c); + + cursor.iterate(); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + catch (RuntimeException e) { + throw new IgniteException("Runtime failure on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + catch (AssertionError e) { + throw new AssertionError("Assertion error on bounds: [lower=" + lower + ", upper=" + upper + "]", e); + } + finally { + checkDestroyed(); + } + } + /** {@inheritDoc} */ @Override public T findFirst() throws IgniteCheckedException { checkDestroyed(); @@ -2509,14 +2537,14 @@ public abstract class BPlusTree extends DataStructure implements */ private final class GetCursor extends Get { /** */ - ForwardCursor cursor; + AbstractForwardCursor cursor; /** * @param lower Lower bound. * @param shift Shift. * @param cursor Cursor. */ - GetCursor(L lower, int shift, ForwardCursor cursor) { + GetCursor(L lower, int shift, AbstractForwardCursor cursor) { super(lower, false); assert shift != 0; // Either handle range of equal rows or find a greater row after concurrent merge. @@ -4385,51 +4413,57 @@ public abstract class BPlusTree extends DataStructure implements protected abstract T getRow(BPlusIO io, long pageAddr, int idx, Object x) throws IgniteCheckedException; /** - * Forward cursor. + * */ @SuppressWarnings("unchecked") - private final class ForwardCursor implements GridCursor { - /** */ - private T[] rows = (T[])EMPTY; - - /** */ - private int row = -1; - + private abstract class AbstractForwardCursor { /** */ - private long nextPageId; + long nextPageId; /** */ - private L lowerBound; + L lowerBound; /** */ private int lowerShift = -1; // Initially it is -1 to handle multiple equal rows. /** */ - private final L upperBound; - - /** */ - private final Object x; + final L upperBound; /** * @param lowerBound Lower bound. * @param upperBound Upper bound. */ - ForwardCursor(L lowerBound, L upperBound) { + AbstractForwardCursor(L lowerBound, L upperBound) { this.lowerBound = lowerBound; this.upperBound = upperBound; - this.x = null; } /** - * @param lowerBound Lower bound. - * @param upperBound Upper bound. - * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * */ - ForwardCursor(L lowerBound, L upperBound, Object x) { - this.lowerBound = lowerBound; - this.upperBound = upperBound; - this.x = x; - } + abstract void init0(); + + /** + * @param pageAddr Page address. + * @param io IO. + * @param startIdx Start index. + * @param cnt Number of rows in the buffer. + * @return {@code true} If we were able to fetch rows from this page. + * @throws IgniteCheckedException If failed. + */ + abstract boolean fillFromBuffer0(long pageAddr, BPlusIO io, int startIdx, int cnt) + throws IgniteCheckedException; + + /** + * @return {@code True} If we have rows to return after reading the next page. + * @throws IgniteCheckedException If failed. + */ + abstract boolean reinitialize0() throws IgniteCheckedException; + + /** + * @param readDone {@code True} if traversed all rows. + */ + abstract void onNotFound(boolean readDone); /** * @param pageAddr Page address. @@ -4437,9 +4471,10 @@ public abstract class BPlusTree extends DataStructure implements * @param startIdx Start index. * @throws IgniteCheckedException If failed. */ - private void init(long pageAddr, BPlusIO io, int startIdx) throws IgniteCheckedException { + final void init(long pageAddr, BPlusIO io, int startIdx) throws IgniteCheckedException { nextPageId = 0; - row = -1; + + init0(); int cnt = io.getCount(pageAddr); @@ -4447,16 +4482,10 @@ public abstract class BPlusTree extends DataStructure implements if (cnt == 0) { assert io.getForward(pageAddr) == 0L; - rows = null; - } - else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) { - if (rows != EMPTY) { - assert rows.length > 0; // Otherwise it makes no sense to create an array. - - // Fake clear. - rows[0] = null; - } + onNotFound(true); } + else if (!fillFromBuffer(pageAddr, io, startIdx, cnt)) + onNotFound(false); } /** @@ -4466,7 +4495,7 @@ public abstract class BPlusTree extends DataStructure implements * @return Adjusted to lower bound start index. * @throws IgniteCheckedException If failed. */ - private int findLowerBound(long pageAddr, BPlusIO io, int cnt) throws IgniteCheckedException { + final int findLowerBound(long pageAddr, BPlusIO io, int cnt) throws IgniteCheckedException { assert io.isLeaf(); // Compare with the first row on the page. @@ -4491,7 +4520,7 @@ public abstract class BPlusTree extends DataStructure implements * @return Corrected number of rows with respect to upper bound. * @throws IgniteCheckedException If failed. */ - private int findUpperBound(long pageAddr, BPlusIO io, int low, int cnt) throws IgniteCheckedException { + final int findUpperBound(long pageAddr, BPlusIO io, int low, int cnt) throws IgniteCheckedException { assert io.isLeaf(); // Compare with the last row on the page. @@ -4523,75 +4552,20 @@ public abstract class BPlusTree extends DataStructure implements throws IgniteCheckedException { assert io.isLeaf() : io; assert cnt != 0 : cnt; // We can not see empty pages (empty tree handled in init). - assert startIdx >= 0 : startIdx; + assert startIdx >= 0 || startIdx == -1: startIdx; assert cnt >= startIdx; checkDestroyed(); nextPageId = io.getForward(pageAddr); - if (lowerBound != null && startIdx == 0) - startIdx = findLowerBound(pageAddr, io, cnt); - - if (upperBound != null && cnt != startIdx) - cnt = findUpperBound(pageAddr, io, startIdx, cnt); - - cnt -= startIdx; - - if (cnt == 0) - return false; - - if (rows == EMPTY) - rows = (T[])new Object[cnt]; - - for (int i = 0; i < cnt; i++) { - T r = getRow(io, pageAddr, startIdx + i, x); - - rows = GridArrays.set(rows, i, r); - } - - GridArrays.clearTail(rows, cnt); - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean next() throws IgniteCheckedException { - if (rows == null) - return false; - - if (++row < rows.length && rows[row] != null) { - clearLastRow(); // Allow to GC the last returned row. - - return true; - } - - return nextPage(); - } - - /** - * @return Cleared last row. - */ - private T clearLastRow() { - if (row == 0) - return null; - - int last = row - 1; - - T r = rows[last]; - - assert r != null; - - rows[last] = null; - - return r; + return fillFromBuffer0(pageAddr, io, startIdx, cnt); } /** * @throws IgniteCheckedException If failed. */ - private void find() throws IgniteCheckedException { + final void find() throws IgniteCheckedException { assert lowerBound != null; doFind(new GetCursor(lowerBound, lowerShift, this)); @@ -4607,21 +4581,20 @@ public abstract class BPlusTree extends DataStructure implements // to the previous lower bound. find(); - return next(); + return reinitialize0(); } /** + * @param lastRow Last read row (to be used as new lower bound). * @return {@code true} If we have rows to return after reading the next page. * @throws IgniteCheckedException If failed. */ - private boolean nextPage() throws IgniteCheckedException { - updateLowerBound(clearLastRow()); - - row = 0; + final boolean nextPage(L lastRow) throws IgniteCheckedException { + updateLowerBound(lastRow); for (;;) { if (nextPageId == 0) { - rows = null; + onNotFound(true); return false; // Done. } @@ -4638,7 +4611,7 @@ public abstract class BPlusTree extends DataStructure implements try { BPlusIO io = io(pageAddr); - if (fillFromBuffer(pageAddr, io, 0, io.getCount(pageAddr))) + if (fillFromBuffer(pageAddr, io, -1, io.getCount(pageAddr))) return true; // Continue fetching forward. @@ -4659,12 +4632,227 @@ public abstract class BPlusTree extends DataStructure implements /** * @param lower New exact lower bound. */ - private void updateLowerBound(T lower) { + private void updateLowerBound(L lower) { if (lower != null) { lowerShift = 1; // Now we have the full row an need to avoid duplicates. lowerBound = lower; // Move the lower bound forward for further concurrent merge retries. } } + } + + /** + * Closure cursor. + */ + @SuppressWarnings("unchecked") + private final class ClosureCursor extends AbstractForwardCursor { + /** */ + private final TreeRowClosure p; + + /** */ + private L lastRow; + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param p Row predicate. + */ + ClosureCursor(L lowerBound, L upperBound, TreeRowClosure p) { + super(lowerBound, upperBound); + + assert lowerBound != null; + assert upperBound != null; + assert p != null; + + this.p = p; + } + + /** {@inheritDoc} */ + @Override void init0() { + // No-op. + } + + /** {@inheritDoc} */ + @Override boolean fillFromBuffer0(long pageAddr, BPlusIO io, int startIdx, int cnt) + throws IgniteCheckedException { + if (startIdx == -1) // TODO IGNITE-3478: startIdx == 0? can search twice for first item? + startIdx = findLowerBound(pageAddr, io, cnt); + + if (cnt == startIdx) + return false; + + for (int i = startIdx; i < cnt; i++) { + int cmp = compare(0, io, pageAddr, i, upperBound); + + if (cmp > 0) { + nextPageId = 0; // The End. + + return false; + } + + boolean stop = !p.apply(BPlusTree.this, io, pageAddr, i); + + if (stop) { + nextPageId = 0; // The End. + + return true; + } + } + + if (nextPageId != 0) + lastRow = io.getLookupRow(BPlusTree.this, pageAddr, cnt - 1); // Need save last row. + + return true; + } + + /** {@inheritDoc} */ + @Override boolean reinitialize0() throws IgniteCheckedException { + return true; + } + + /** {@inheritDoc} */ + @Override void onNotFound(boolean readDone) { + nextPageId = 0; + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void iterate() throws IgniteCheckedException { + find(); + + if (nextPageId == 0) { + return; + } + + for (;;) { + L lastRow0 = lastRow; + + lastRow = null; + + nextPage(lastRow0); + + if (nextPageId == 0) + return; + } + } + } + + /** + * Forward cursor. + */ + @SuppressWarnings("unchecked") + private final class ForwardCursor extends AbstractForwardCursor implements GridCursor { + /** */ + final Object x; + + /** */ + private T[] rows = (T[])EMPTY; + + /** */ + private int row = -1; + + /** + * @param lowerBound Lower bound. + * @param upperBound Upper bound. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + */ + ForwardCursor(L lowerBound, L upperBound, Object x) { + super(lowerBound, upperBound); + + this.x = x; + } + + /** {@inheritDoc} */ + @Override boolean fillFromBuffer0(long pageAddr, BPlusIO io, int startIdx, int cnt) throws IgniteCheckedException { + if (startIdx == -1) { + if (lowerBound != null) + startIdx = findLowerBound(pageAddr, io, cnt); + else + startIdx = 0; + } + + if (upperBound != null && cnt != startIdx) + cnt = findUpperBound(pageAddr, io, startIdx, cnt); + + cnt -= startIdx; + + if (cnt == 0) + return false; + + if (rows == EMPTY) + rows = (T[])new Object[cnt]; + + for (int i = 0; i < cnt; i++) { + T r = getRow(io, pageAddr, startIdx + i, x); + + rows = GridArrays.set(rows, i, r); + } + + GridArrays.clearTail(rows, cnt); + + return true; + } + + /** {@inheritDoc} */ + @Override boolean reinitialize0() throws IgniteCheckedException { + return next(); + } + + /** {@inheritDoc} */ + @Override void onNotFound(boolean readDone) { + if (readDone) + rows = null; + else { + if (rows != EMPTY) { + assert rows.length > 0; // Otherwise it makes no sense to create an array. + + // Fake clear. + rows[0] = null; + } + } + } + + /** {@inheritDoc} */ + @Override void init0() { + row = -1; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean next() throws IgniteCheckedException { + if (rows == null) + return false; + + if (++row < rows.length && rows[row] != null) { + clearLastRow(); // Allow to GC the last returned row. + + return true; + } + + T lastRow = clearLastRow(); + + row = 0; + + return nextPage(lastRow); + } + + /** + * @return Cleared last row. + */ + private T clearLastRow() { + if (row == 0) + return null; + + int last = row - 1; + + T r = rows[last]; + + assert r != null; + + rows[last] = null; + + return r; + } /** {@inheritDoc} */ @Override public T get() { @@ -4805,4 +4993,20 @@ public abstract class BPlusTree extends DataStructure implements /** */ DONE } + + /** + * + */ + public interface TreeRowClosure { + /** + * @param tree Tree. + * @param io Tree IO. + * @param pageAddr Page address. + * @param idx Item index. + * @return {@code True} if item pass predicate. TODO IGNITE-3478 + * @throws IgniteCheckedException If failed. + */ + public boolean apply(BPlusTree tree, BPlusIO io, long pageAddr, int idx) + throws IgniteCheckedException; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java index 3fc0962..a07d012 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java @@ -76,8 +76,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO i long link = getLink(pageAddr, idx); if (storeMvccVersion()) { - long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx); - long mvccCntr = getMvccUpdateCounter(pageAddr, idx); + long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx); + long mvccCntr = getMvccCounter(pageAddr, idx); return ((CacheDataTree)tree).rowStore().mvccRow(cacheId, hash, @@ -119,8 +119,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO i } if (storeMvccVersion()) { - long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(srcPageAddr, srcIdx); - long mvcCntr = rowIo.getMvccUpdateCounter(srcPageAddr, srcIdx); + long mvccTopVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx); + long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx); assert mvccTopVer > 0 : mvccTopVer; assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA; http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java index a4eac3e..ef08bec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java @@ -94,8 +94,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO imp } if (storeMvccVersion()) { - long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccUpdateTopologyVersion(srcPageAddr, srcIdx); - long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccUpdateCounter(srcPageAddr, srcIdx); + long mvccUpdateTopVer = ((RowLinkIO)srcIo).getMvccCoordinatorVersion(srcPageAddr, srcIdx); + long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx); assert mvccUpdateTopVer >=0 : mvccUpdateCntr; assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA; @@ -114,8 +114,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO imp long link = getLink(pageAddr, idx); if (storeMvccVersion()) { - long mvccTopVer = getMvccUpdateTopologyVersion(pageAddr, idx); - long mvccCntr = getMvccUpdateCounter(pageAddr, idx); + long mvccTopVer = getMvccCoordinatorVersion(pageAddr, idx); + long mvccCntr = getMvccCounter(pageAddr, idx); return ((CacheDataTree)tree).rowStore().mvccRow(cacheId, hash, http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index 767c996..eaeefee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -114,7 +114,7 @@ public class CacheDataTree extends BPlusTree { /** {@inheritDoc} */ @Override protected int compare(BPlusIO iox, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { - assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0;// || row.getClass() == SearchRow.class; + assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != 0 : row; RowLinkIO io = (RowLinkIO)iox; @@ -158,14 +158,14 @@ public class CacheDataTree extends BPlusTree { if (cmp != 0 || !grp.mvccEnabled()) return 0; - long mvccCrdVer = io.getMvccUpdateTopologyVersion(pageAddr, idx); + long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx); cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer); if (cmp != 0) return cmp; - long mvccCntr = io.getMvccUpdateCounter(pageAddr, idx); + long mvccCntr = io.getMvccCounter(pageAddr, idx); assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA; @@ -188,8 +188,8 @@ public class CacheDataTree extends BPlusTree { CacheDataRowAdapter.RowData.FULL; if (grp.mvccEnabled()) { - long mvccTopVer = rowIo.getMvccUpdateTopologyVersion(pageAddr, idx); - long mvccCntr = rowIo.getMvccUpdateCounter(pageAddr, idx); + long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx); + long mvccCntr = rowIo.getMvccCounter(pageAddr, idx); return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java index fc9d15d..3d02b27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java @@ -53,12 +53,12 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return 0; } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java index b328924..58ae9ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java @@ -53,12 +53,12 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return 0; } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java index 0d424b7..19a5c47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java @@ -53,12 +53,12 @@ public final class DataInnerIO extends AbstractDataInnerIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return 0; } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java index ff51bc2..ab10b96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java @@ -53,12 +53,12 @@ public final class DataLeafIO extends AbstractDataLeafIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return 0; } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return CacheCoordinatorsProcessor.COUNTER_NA; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java index 5f4f44c..51a911d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataInnerIO.java @@ -53,12 +53,12 @@ public final class MvccDataInnerIO extends AbstractDataInnerIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 12); } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 20); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java index e7cfca7..84c33a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataLeafIO.java @@ -53,12 +53,12 @@ public final class MvccDataLeafIO extends AbstractDataLeafIO { } /** {@inheritDoc} */ - @Override public long getMvccUpdateTopologyVersion(long pageAddr, int idx) { + @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 12); } /** {@inheritDoc} */ - @Override public long getMvccUpdateCounter(long pageAddr, int idx) { + @Override public long getMvccCounter(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 20); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java new file mode 100644 index 0000000..aa9422d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeRowClosure { + /** */ + private CacheDataRow resRow; + + /** + * @param cacheId Cache ID. + * @param key Key. + */ + public MvccKeyMaxVersionBound(int cacheId, KeyCacheObject key) { + super(cacheId, key); + } + + /** + * @return Found row. + */ + @Nullable public CacheDataRow row() { + return resRow; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree tree, BPlusIO io, + long pageAddr, + int idx) + throws IgniteCheckedException + { + resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); + + return false; // Stop search. + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccKeyMaxVersionBound.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java new file mode 100644 index 0000000..f2ac308 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMinVersionBound.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class MvccKeyMinVersionBound extends SearchRow { + /** + * @param cacheId Cache ID. + * @param key Key. + */ + public MvccKeyMinVersionBound(int cacheId, KeyCacheObject key) { + super(cacheId, key); + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return 1L; + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return 1L; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccKeyMinVersionBound.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java new file mode 100644 index 0000000..79544e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure { + /** */ + private Boolean hasPrev; + + /** */ + private boolean canCleanup; + + /** */ + private GridLongList activeTxs; + + /** */ + private List cleanupRows; + + /** */ + private final MvccCoordinatorVersion mvccVer; + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + * @param mvccVer Mvcc version. + * @param part Partition. + * @param cacheId Cache ID. + */ + public MvccUpdateRow( + KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + MvccCoordinatorVersion mvccVer, + int part, + int cacheId) { + super(key, val, ver, part, 0L, cacheId); + + this.mvccVer = mvccVer; + } + + /** + * @return {@code True} if previous value was non-null. + */ + public boolean previousNotNull() { + return hasPrev != null && hasPrev; + } + + /** + * @return Active transactions to wait for. + */ + @Nullable public GridLongList activeTransactions() { + return activeTxs; + } + + /** + * @return Rows which are safe to cleanup. + */ + public List cleanupRows() { + return cleanupRows; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree tree, + BPlusIO io, + long pageAddr, + int idx) + throws IgniteCheckedException + { + RowLinkIO rowIo = (RowLinkIO)io; + + // All previous version should be less then new one. + assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx); + assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx); + + boolean checkActive = mvccVer.activeTransactions().size() > 0; + + boolean txActive = false; + + // Suppose transactions on previous coordinator versions are done. + if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) { + long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx); + + if (mvccVer.activeTransactions().contains(rowMvccCntr)) { + txActive = true; + + if (activeTxs == null) + activeTxs = new GridLongList(); + + activeTxs.add(rowMvccCntr); + } + } + + if (hasPrev == null) + hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes. + + if (!txActive) { + assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0; + + int cmp; + + if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) + cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx)); + else + cmp = 1; + + if (cmp >= 0) { + // Do not cleanup oldest version. + if (canCleanup) { + CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx); + + assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row; + + // Should not be possible to cleanup active tx. + assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion() + || !mvccVer.activeTransactions().contains(row.mvccCounter()); + + if (cleanupRows == null) + cleanupRows = new ArrayList<>(); + + cleanupRows.add(row); + } + else + canCleanup = true; + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return mvccVer.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return mvccVer.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccUpdateRow.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java new file mode 100644 index 0000000..c829afb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.tree; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.TreeRowClosure { + /** */ + private final MvccCoordinatorVersion ver; + + /** */ + private CacheDataRow resRow; + + /** + * @param cacheId Cache ID. + * @param key Key. + * @param ver Mvcc version. + */ + public MvccVersionBasedSearchRow(int cacheId, KeyCacheObject key, MvccCoordinatorVersion ver) { + super(cacheId, key); + + assert ver != null; + + this.ver = ver; + } + + /** + * @return Found row. + */ + @Nullable public CacheDataRow row() { + return resRow; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree tree, + BPlusIO io, + long pageAddr, + int idx) throws IgniteCheckedException + { + boolean visible = true; + + if (ver.activeTransactions().size() > 0) { + RowLinkIO rowIo = (RowLinkIO)io; + + // TODO IGNITE-3478 sort active transactions? + if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion()) + visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx)); + } + + if (visible) { + resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); + + return false; // Stop search. + } + + return true; + } + + /** {@inheritDoc} */ + @Override public long mvccCoordinatorVersion() { + return ver.coordinatorVersion(); + } + + /** {@inheritDoc} */ + @Override public long mvccCounter() { + return ver.counter(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccVersionBasedSearchRow.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java index 8b341cb..111968d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/RowLinkIO.java @@ -42,7 +42,17 @@ public interface RowLinkIO { */ public int getCacheId(long pageAddr, int idx); - public long getMvccUpdateTopologyVersion(long pageAddr, int idx); + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc coordinator version. + */ + public long getMvccCoordinatorVersion(long pageAddr, int idx); - public long getMvccUpdateCounter(long pageAddr, int idx); + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Mvcc counter. + */ + public long getMvccCounter(long pageAddr, int idx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index f28fe2d..115e8a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; @@ -47,16 +48,20 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -2534,6 +2539,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } /** + * @param restartCrd If {@code true} dedicated coordinator node is restarted during test. * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. @@ -2680,6 +2686,149 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { stop.set(true); } } + /** + * @throws IgniteCheckedException If failed. + */ + public void testSize() throws Exception { + Ignite node = startGrid(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1)); + + assertEquals(cache.size(), 0); + + final int KEYS = 10; + + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + + assertEquals(i + 1, cache.size()); + } + + for (int i = 0; i < KEYS; i++) { + final Integer key = i; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + + assertEquals(KEYS, cache.size()); + } + + // TODO IGNITE-3478: test removes. + } + + + /** + * @throws IgniteCheckedException If failed. + */ + public void testInternalApi() throws Exception { + Ignite node = startGrid(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1)); + + GridCacheContext cctx = + ((IgniteKernal)node).context().cache().context().cacheContext(CU.cacheId(cache.getName())); + + CacheCoordinatorsProcessor crd = cctx.kernalContext().coordinators(); + + // Start query to prevent cleanup. + IgniteInternalFuture fut = crd.requestQueryCounter(crd.currentCoordinator()); + + fut.get(); + + final int KEYS = 1000; + + for (int i = 0; i < 10; i++) { + for (int k = 0; k < KEYS; k++) { + final Integer key = k; + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, i); + + tx.commit(); + } + } + } + + for (int k = 0; k < KEYS; k++) { + final Integer key = k; + + KeyCacheObject key0 = cctx.toCacheKeyObject(key); + + List> vers = cctx.offheap().mvccAllVersions(cctx, key0); + + assertEquals(10, vers.size()); + + CacheDataRow row = cctx.offheap().read(cctx, key0); + + checkRow(cctx, row, key0, vers.get(0).get1()); + + for (T2 ver : vers) { + MvccCounter cntr = ver.get2(); + + MvccCoordinatorVersion readVer = + new MvccCoordinatorVersionResponse(cntr.coordinatorVersion(), cntr.counter(), 0); + + row = cctx.offheap().mvccRead(cctx, key0, readVer); + + checkRow(cctx, row, key0, ver.get1()); + } + + checkRow(cctx, + cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion() + 1, 1)), + key0, + vers.get(0).get1()); + + checkRow(cctx, + cctx.offheap().mvccRead(cctx, key0, version(vers.get(0).get2().coordinatorVersion(), vers.get(0).get2().counter() + 1)), + key0, + vers.get(0).get1()); + + MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000); + + for (int v = 0; v < vers.size(); v++) { + MvccCounter cntr = vers.get(v).get2(); + + ver.addTx(cntr.counter()); + + row = cctx.offheap().mvccRead(cctx, key0, ver); + + if (v == vers.size() - 1) + assertNull(row); + else + checkRow(cctx, row, key0, vers.get(v + 1).get1()); + } + } + } + + /** + * @param cctx Context. + * @param row Row. + * @param expKey Expected row key. + * @param expVal Expected row value. + */ + private void checkRow(GridCacheContext cctx, CacheDataRow row, KeyCacheObject expKey, Object expVal) { + assertNotNull(row); + assertEquals(expKey, row.key()); + assertEquals(expVal, row.value().value(cctx.cacheObjectContext(), false)); + } + + /** + * @param crdVer Coordinator version. + * @param cntr Counter. + * @return Version. + */ + private MvccCoordinatorVersionResponse version(long crdVer, long cntr) { + return new MvccCoordinatorVersionResponse(crdVer, cntr, 0); + } /** * @return Cache configurations. http://git-wip-us.apache.org/repos/asf/ignite/blob/fd53c1a8/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java index 9c0d791..e2f6b2e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java @@ -25,6 +25,7 @@ import java.util.Random; import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -570,6 +571,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertEquals(x, tree.findOne(x).longValue()); + checkIterate(tree, x, x, x, true); assertNoLocks(); @@ -584,12 +586,15 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNull(tree.findOne(-1L)); - for (long x = 0; x < cnt; x++) + for (long x = 0; x < cnt; x++) { assertEquals(x, tree.findOne(x).longValue()); + checkIterate(tree, x, x, x, true); + } assertNoLocks(); assertNull(tree.findOne(cnt)); + checkIterate(tree, cnt, cnt, null, false); for (long x = RMV_INC > 0 ? 0 : cnt - 1; x >= 0 && x < cnt; x += RMV_INC) { X.println(" -- " + x); @@ -603,6 +608,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertNoLocks(); assertNull(tree.findOne(x)); + checkIterate(tree, x, x, null, false); assertNoLocks(); @@ -619,6 +625,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** + * @param tree + * @param lower + * @param upper + * @param exp + * @param expFound + * @throws IgniteCheckedException + */ + private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound) + throws IgniteCheckedException { + TestTreeRowClosure c = new TestTreeRowClosure(exp); + + tree.iterate(lower, upper, c); + + assertEquals(expFound, c.found); + } + + private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound) + throws IgniteCheckedException { + c.found = false; + + tree.iterate(lower, upper, c); + + assertEquals(expFound, c.found); + } + + /** * @throws IgniteCheckedException If failed. */ public void testRandomInvoke_1_30_1() throws IgniteCheckedException { @@ -1242,6 +1274,206 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testIterate() throws Exception { + MAX_PER_PAGE = 5; + + TestTree tree = createTestTree(true); + + checkIterate(tree, 0L, 100L, null, false); + + for (long idx = 1L; idx <= 10L; ++idx) + tree.put(idx); + + for (long idx = 1L; idx <= 10L; ++idx) + checkIterate(tree, idx, 100L, idx, true); + + checkIterate(tree, 0L, 100L, 1L, true); + + for (long idx = 1L; idx <= 10L; ++idx) + checkIterate(tree, idx, 100L, 10L, true); + + checkIterate(tree, 0L, 100L, 100L, false); + + for (long idx = 1L; idx <= 10L; ++idx) + checkIterate(tree, 0L, 100L, idx, true); + + for (long idx = 0L; idx <= 10L; ++idx) + checkIterate(tree, idx, 11L, -1L, false); + } + + /** + * @throws Exception If failed. + */ + public void testIterateConcurrentPutRemove() throws Exception { + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testIterateConcurrentPutRemove_1() throws Exception { + MAX_PER_PAGE = 1; + + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testIterateConcurrentPutRemove_5() throws Exception { + MAX_PER_PAGE = 5; + + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + public void testIteratePutRemove_10() throws Exception { + MAX_PER_PAGE = 10; + + findOneBoundedConcurrentPutRemove(); + } + + /** + * @throws Exception If failed. + */ + private void findOneBoundedConcurrentPutRemove() throws Exception { + final TestTree tree = createTestTree(true); + + final int KEYS = 10_000; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + for (long idx = 0L; idx < KEYS; ++idx) + tree.put(idx); + + final Long findKey; + + if (MAX_PER_PAGE > 0) { + switch (i) { + case 0: + findKey = 1L; + + break; + + case 1: + findKey = (long)MAX_PER_PAGE; + + break; + + case 2: + findKey = (long)MAX_PER_PAGE - 1; + + break; + + case 3: + findKey = (long)MAX_PER_PAGE + 1; + + break; + + case 4: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE; + + break; + + case 5: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE - 1; + + break; + + case 6: + findKey = (long)(KEYS / MAX_PER_PAGE / 2) * MAX_PER_PAGE + 1; + + break; + + case 7: + findKey = (long)KEYS - 1; + + break; + + default: + findKey = rnd.nextLong(KEYS); + } + } + else + findKey = rnd.nextLong(KEYS); + + info("Iteration [iter=" + i + ", key=" + findKey + ']'); + + assertEquals(findKey, tree.findOne(findKey)); + checkIterate(tree, findKey, findKey, findKey, true); + + IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + TestTreeRowClosure p = new TestTreeRowClosure(findKey); + + TestTreeRowClosure falseP = new TestTreeRowClosure(-1L); + + int cnt = 0; + + while (!stop.get()) { + int shift = MAX_PER_PAGE > 0 ? rnd.nextInt(MAX_PER_PAGE * 2) : rnd.nextInt(100); + + checkIterateC(tree, findKey, findKey, p, true); + + checkIterateC(tree, findKey - shift, findKey, p, true); + + checkIterateC(tree, findKey - shift, findKey + shift, p, true); + + checkIterateC(tree, findKey, findKey + shift, p, true); + + checkIterateC(tree, -100L, KEYS + 100L, falseP, false); + + cnt++; + } + + info("Done, read count: " + cnt); + + return null; + } + }, 10, "find"); + + asyncRunFut = new GridCompoundFuture<>(); + + asyncRunFut.add(getFut); + + asyncRunFut.markInitialized(); + + try { + U.sleep(100); + + for (int j = 0; j < 20; j++) { + for (long idx = 0L; idx < KEYS / 2; ++idx) { + long toRmv = rnd.nextLong(KEYS); + + if (toRmv != findKey) + tree.remove(toRmv); + } + + for (long idx = 0L; idx < KEYS / 2; ++idx) { + long put = rnd.nextLong(KEYS); + + tree.put(put); + } + } + } + finally { + stop.set(true); + } + + asyncRunFut.get(); + + stop.set(false); + } + } + + /** * */ public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception { @@ -1449,6 +1681,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { last = c.get(); } + + TestTreeFindFirstClosure cl = new TestTreeFindFirstClosure(); + + tree.iterate((long)low, (long)high, cl); + + last = cl.val; + + if (last != null) { + assertTrue(low + " <= " + last + " <= " + high, last >= low); + assertTrue(low + " <= " + last + " <= " + high, last <= high); + } } return null; @@ -1853,4 +2096,50 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { return PageUtils.getLong(pageAddr, offset(idx)); } } + + /** + * + */ + static class TestTreeRowClosure implements BPlusTree.TreeRowClosure { + /** */ + private final Long expVal; + + /** */ + private boolean found; + + /** + * @param expVal Value to find or {@code null} to find first. + */ + TestTreeRowClosure(Long expVal) { + this.expVal = expVal; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree tree, BPlusIO io, long pageAddr, int idx) + throws IgniteCheckedException { + assert !found; + + found = expVal == null || io.getLookupRow(tree, pageAddr, idx).equals(expVal); + + return !found; + } + } + + /** + * + */ + static class TestTreeFindFirstClosure implements BPlusTree.TreeRowClosure { + /** */ + private Long val; + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree tree, BPlusIO io, long pageAddr, int idx) + throws IgniteCheckedException { + assert val == null; + + val = io.getLookupRow(tree, pageAddr, idx); + + return false; + } + } }