ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/15] ignite git commit: ignite-5937
Date Tue, 10 Oct 2017 12:44:19 GMT
ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb969db0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb969db0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb969db0

Branch: refs/heads/ignite-5937
Commit: bb969db0457e46fc2db4322927bd3536fdd9fb7b
Parents: de3ed0d
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Oct 10 15:36:01 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Oct 10 15:41:21 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryInfo.java    |   5 -
 .../processors/cache/GridCacheMapEntry.java     |  18 ++-
 .../cache/IgniteCacheOffheapManager.java        |  39 +++++
 .../cache/IgniteCacheOffheapManagerImpl.java    | 160 +++++++++++++------
 .../cache/mvcc/MvccCoordinatorVersion.java      |   5 -
 .../mvcc/MvccCoordinatorVersionResponse.java    |   5 -
 .../persistence/GridCacheOffheapManager.java    |  14 ++
 .../processors/cache/tree/MvccRemoveRow.java    |  11 +-
 .../processors/cache/tree/MvccUpdateRow.java    |  25 ++-
 .../datastreamer/DataStreamerImpl.java          |   6 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  34 +++-
 11 files changed, 235 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 8a5f0df..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,11 +101,6 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion
{
         return 0;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean initialLoad() {
-        return true;
-    }
-
     /**
      * @return Cache ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ded9513..a1535e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2581,6 +2581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
             boolean walEnabled = !cctx.isNear() && cctx.shared().wal() != null;
 
+            // TODO IGNITE-3478: move checks in special initialValue method.
             if (cctx.shared().database().persistenceEnabled()) {
                 unswap(false);
 
@@ -2603,14 +2604,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                if (val != null) {
-                    if (cctx.mvccEnabled())
-                        cctx.offheap().mvccUpdate(false, this, val, ver, mvccVer);
-                    else
-                        storeValue(val, expTime, ver, null);
+                if (cctx.mvccEnabled()) {
+                    cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+
+                    if (val != null)
+                        update(val, expTime, ttl, ver, true);
                 }
+                else {
+                    if (val != null) {
+                        storeValue(val, expTime, ver, null);
 
-                update(val, expTime, ttl, ver, true);
+                        update(val, expTime, ttl, ver, true);
+                    }
+                }
 
                 boolean skipQryNtf = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index bee2108..9e3d0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -189,6 +189,7 @@ public interface IgniteCacheOffheapManager {
         throws IgniteCheckedException;
 
     /**
+     * @param primary {@code True} if on primary node.
      * @param entry Entry.
      * @param val Value.
      * @param ver Cache version.
@@ -204,6 +205,13 @@ public interface IgniteCacheOffheapManager {
         MvccCoordinatorVersion mvccVer
     ) throws IgniteCheckedException;
 
+    /**
+     * @param primary {@code True} if on primary node.
+     * @param entry Entry.
+     * @param mvccVer Mvcc update version.
+     * @return Transactions to wait for before finishing current transaction.
+     * @throws IgniteCheckedException If failed.
+     */
     @Nullable public GridLongList mvccRemove(
         boolean primary,
         GridCacheMapEntry entry,
@@ -211,6 +219,21 @@ public interface IgniteCacheOffheapManager {
     ) throws IgniteCheckedException;
 
     /**
+     * @param entry Entry.
+     * @param val Value.
+     * @param ver Version.
+     * @param mvccVer Mvcc update version.
+     * @return {@code True} if value was inserted.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean mvccInitialValue(
+        GridCacheMapEntry entry,
+        @Nullable CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException;
+
+    /**
      * @param cctx Cache context.
      * @param key  Key.
      * @param val  Value.
@@ -507,6 +530,22 @@ public interface IgniteCacheOffheapManager {
          * @param val Value.
          * @param ver Version.
          * @param mvccVer Mvcc version.
+         * @return {@code True} if new value was inserted.
+         * @throws IgniteCheckedException If failed.
+         */
+        boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param mvccVer Mvcc version.
          * @return List of transactions to wait for.
          * @throws IgniteCheckedException If failed.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 380ec94..4fb5bfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -383,6 +383,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public boolean mvccInitialValue(
+        GridCacheMapEntry entry,
+        CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+        return dataStore(entry.localPartition()).mvccInitialValue(
+            entry.context(),
+            entry.key(),
+            val,
+            ver,
+            mvccVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public GridLongList mvccUpdate(
         boolean primary,
         GridCacheMapEntry entry,
@@ -1360,9 +1374,76 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public GridLongList mvccRemove(GridCacheContext cctx,
+        @Override public boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer)
+            throws IgniteCheckedException
+        {
+            assert mvccVer != null;
+
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+            try {
+                assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
+                int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+                // Make sure value bytes initialized.
+                key.valueBytes(coCtx);
+
+                MvccUpdateRow updateRow;
+
+                if (val != null) {
+                    val.valueBytes(coCtx);
+
+                    updateRow = new MvccUpdateRow(
+                        key,
+                        val,
+                        ver,
+                        mvccVer,
+                        partId,
+                        cacheId);
+                }
+                else {
+                    updateRow = new MvccRemoveRow(
+                        key,
+                        mvccVer,
+                        partId,
+                        cacheId);
+                }
+
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
+
+                rowStore.addRow(updateRow);
+
+                boolean old = dataTree.putx(updateRow);
+
+                assert !old;
+
+                if (val != null)
+                    incrementSize(cctx.cacheId());
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridLongList mvccUpdate(
+            GridCacheContext cctx,
             boolean primary,
             KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             assert mvccVer != null;
 
@@ -1376,9 +1457,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
+                val.valueBytes(coCtx);
 
-                MvccRemoveRow updateRow = new MvccRemoveRow(
+                MvccUpdateRow updateRow = new MvccUpdateRow(
                     key,
+                    val,
+                    ver,
                     mvccVer,
                     partId,
                     cacheId);
@@ -1392,27 +1476,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
                     assert !primary : updateRow;
-
-                    cleanup(updateRow.cleanupRows(), false);
                 }
                 else {
-                    if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
-                        decrementSize(cacheId);
-
-                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
-
-                    if (rmvRow == null)
-                        rowStore.addRow(updateRow);
-                    else
-                        updateRow.link(rmvRow.link());
-
-                    assert updateRow.link() != 0L;
+                    rowStore.addRow(updateRow);
 
                     boolean old = dataTree.putx(updateRow);
 
                     assert !old;
+
+                    if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+                        incrementSize(cctx.cacheId());
                 }
 
+                cleanup(updateRow.cleanupRows(), false);
+
                 return updateRow.activeTransactions();
             }
             finally {
@@ -1421,12 +1498,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public GridLongList mvccUpdate(
-            GridCacheContext cctx,
+        @Override public GridLongList mvccRemove(GridCacheContext cctx,
             boolean primary,
             KeyCacheObject key,
-            CacheObject val,
-            GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             assert mvccVer != null;
 
@@ -1440,12 +1514,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
-                val.valueBytes(coCtx);
 
-                MvccUpdateRow updateRow = new MvccUpdateRow(
+                MvccRemoveRow updateRow = new MvccRemoveRow(
                     key,
-                    val,
-                    ver,
                     mvccVer,
                     partId,
                     cacheId);
@@ -1453,42 +1524,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
                     updateRow.cacheId(cctx.cacheId());
 
-                GridLongList waitTxs = null;
-
-                if (mvccVer.initialLoad()) {
-                    rowStore.addRow(updateRow);
+                dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                    boolean old = dataTree.putx(updateRow);
+                MvccUpdateRow.UpdateResult res = updateRow.updateResult();
 
-                    assert !old;
+                if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+                    assert !primary : updateRow;
 
-                    incrementSize(cctx.cacheId());
+                    cleanup(updateRow.cleanupRows(), false);
                 }
                 else {
-                    dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key),
updateRow);
+                    if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+                        decrementSize(cacheId);
 
-                    MvccUpdateRow.UpdateResult res = updateRow.updateResult();
+                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
 
-                    if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
-                        assert !primary : updateRow;
-                    }
-                    else {
+                    if (rmvRow == null)
                         rowStore.addRow(updateRow);
+                    else
+                        updateRow.link(rmvRow.link());
 
-                        boolean old = dataTree.putx(updateRow);
-
-                        assert !old;
-
-                        if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
-                            incrementSize(cctx.cacheId());
-                    }
+                    assert updateRow.link() != 0L;
 
-                    cleanup(updateRow.cleanupRows(), false);
+                    boolean old = dataTree.putx(updateRow);
 
-                    waitTxs = updateRow.activeTransactions();
+                    assert !old;
                 }
 
-                return waitTxs;
+                return updateRow.activeTransactions();
             }
             finally {
                 busyLock.leaveBusy();
@@ -1848,7 +1911,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                         long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
 
-                        if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter()
> ver.counter())
+                        if (rowCrdVer > ver.coordinatorVersion())
+                            continue;
+
+                        if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter()
> ver.counter())
                             continue;
 
                         MvccLongList txs = ver.activeTransactions();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index 4003b73..d80e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,9 +42,4 @@ public interface MvccCoordinatorVersion extends Message {
      * @return Counter.
      */
     public long counter();
-
-    /**
-     * @return {@code True} if version for initial load update.
-     */
-    public boolean initialLoad();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 20d23ed..c037226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -159,11 +159,6 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage,
M
     }
 
     /** {@inheritDoc} */
-    @Override public boolean initialLoad() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index ee651c2..45b78d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1256,6 +1256,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl
imple
         }
 
         /** {@inheritDoc} */
+        @Override public boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer)
+            throws IgniteCheckedException
+        {
+            CacheDataStore delegate = init0(false);
+
+            return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+        }
+
+        /** {@inheritDoc} */
         @Override public GridLongList mvccUpdate(
             GridCacheContext cctx,
             boolean primary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
index 8fd8a6e..af11a9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.createVersionForRemovedValue;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -42,7 +44,12 @@ public class MvccRemoveRow extends MvccUpdateRow {
 
     /** {@inheritDoc} */
     @Override public long mvccCoordinatorVersion() {
-        return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+        return createVersionForRemovedValue(super.mvccCoordinatorVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long unmaskedCoordinatorVersion() {
+        return unmaskCoordinatorVersion(super.mvccCoordinatorVersion());
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 794661d..137ca28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -105,13 +105,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr,
idx));
         long rowCntr = io.getMvccCounter(pageAddr, idx);
 
-        int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+        int cmp = Long.compare(unmaskedCoordinatorVersion(), rowCrdVer);
 
         if (cmp == 0)
             cmp = Long.compare(mvccVer.counter(), rowCntr);
 
         // Can be equals if backup rebalanced value updated on primary.
-        assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+        assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
             ", updCntr=" + mvccVer.counter() +
             ", rowCrd=" + rowCrdVer +
             ", rowCntr=" + rowCntr + ']';
@@ -138,11 +138,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
         long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
 
+        long crdVer = unmaskedCoordinatorVersion();
+
         if (res == null) {
-            int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+            int cmp = Long.compare(crdVer, rowCrdVer);
 
             if (cmp == 0)
-                cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr,
idx));
+                cmp = Long.compare(mvccVer.counter(), rowIo.getMvccCounter(pageAddr, idx));
 
             if (cmp == 0)
                 res = UpdateResult.VERSION_FOUND;
@@ -152,7 +154,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         }
 
         // Suppose transactions on previous coordinator versions are done.
-        if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
+        if (checkActive && crdVer == rowCrdVer) {
             long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
             if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -166,11 +168,11 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         }
 
         if (!txActive) {
-            assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
+            assert Long.compare(crdVer, rowCrdVer) >= 0;
 
             int cmp;
 
-            if (mvccVer.coordinatorVersion() == rowCrdVer)
+            if (crdVer == rowCrdVer)
                 cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr,
idx));
             else
                 cmp = 1;
@@ -183,7 +185,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
                     assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA
: row;
 
                     // Should not be possible to cleanup active tx.
-                    assert rowCrdVer != mvccVer.coordinatorVersion()
+                    assert rowCrdVer != crdVer
                         || !mvccVer.activeTransactions().contains(row.mvccCounter());
 
                     if (cleanupRows == null)
@@ -199,6 +201,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         return true;
     }
 
+    /**
+     * @return Coordinator version without flags.
+     */
+    protected long unmaskedCoordinatorVersion() {
+        return mvccVer.coordinatorVersion();
+    }
+
     /** {@inheritDoc} */
     @Override public long mvccCoordinatorVersion() {
         return mvccVer.coordinatorVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L)
{
-            @Override public boolean initialLoad() {
-                return true;
-            }
-        };
+        new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bb969db0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index d45afe7..1abc116 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1786,13 +1786,33 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
 
         final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED,
FULL_SYNC, 1, 64));
 
-        final int KEYS = 100;
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < 100; k++)
+                cache.remove(k);
 
-        checkValues(new HashMap<>(), cache);
+            tx.commit();
+        }
+
+        Map<Object, Object> expVals = new HashMap<>();
 
         try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-            for (int k = 0; k < KEYS; k++)
-                cache.remove(k);
+            for (int k = 100; k < 200; k++) {
+                cache.put(k, k);
+
+                expVals.put(k, k);
+            }
+
+            tx.commit();
+        }
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 100; k < 200; k++) {
+                if (k % 2 == 0) {
+                    cache.remove(k);
+
+                    expVals.remove(k);
+                }
+            }
 
             tx.commit();
         }
@@ -1800,6 +1820,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         startGrid(1);
 
         awaitPartitionMapExchange();
+
+        checkValues(expVals, jcache(1));
+
+        stopGrid(0);
+
+        checkValues(expVals, jcache(1));
     }
 
     /**


Mime
View raw message