ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [7/7] ignite git commit: ignite-3478 Support for removes
Date Wed, 11 Oct 2017 08:24:55 GMT
ignite-3478 Support for removes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/970cf47a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/970cf47a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/970cf47a

Branch: refs/heads/ignite-3478
Commit: 970cf47a51dc6e754677b00e85e60effc48083ba
Parents: 69fd367
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Oct 11 11:24:32 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Oct 11 11:24:32 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryInfo.java    |   5 -
 .../processors/cache/GridCacheMapEntry.java     |  36 +-
 .../cache/IgniteCacheOffheapManager.java        |  82 +++-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 401 ++++++++++-------
 .../distributed/dht/GridDhtCacheEntry.java      |   5 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |  24 +-
 .../GridDhtPartitionSupplyMessage.java          |   4 +-
 .../GridDhtPartitionsExchangeFuture.java        |   2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  41 +-
 .../cache/mvcc/MvccCoordinatorVersion.java      |   5 -
 .../mvcc/MvccCoordinatorVersionResponse.java    |   5 -
 .../cache/persistence/CacheDataRow.java         |   5 +
 .../cache/persistence/CacheDataRowAdapter.java  |   5 +
 .../cache/persistence/CacheSearchRow.java       |   6 +
 .../persistence/GridCacheOffheapManager.java    |  43 +-
 .../processors/cache/persistence/RowStore.java  |   2 +
 .../persistence/freelist/FreeListImpl.java      |  11 +-
 .../cache/persistence/tree/io/DataPageIO.java   |  22 +-
 .../transactions/IgniteTxLocalAdapter.java      |  26 +-
 .../cache/tree/AbstractDataInnerIO.java         |  10 +-
 .../cache/tree/AbstractDataLeafIO.java          |  10 +-
 .../cache/tree/CacheDataRowStore.java           |  18 +-
 .../processors/cache/tree/CacheDataTree.java    |   8 +-
 .../internal/processors/cache/tree/DataRow.java |   7 +
 .../processors/cache/tree/MvccDataRow.java      |  32 +-
 .../cache/tree/MvccKeyMaxVersionBound.java      |   9 +-
 .../processors/cache/tree/MvccRemoveRow.java    |  64 +++
 .../processors/cache/tree/MvccUpdateRow.java    |  88 +++-
 .../cache/tree/MvccVersionBasedSearchRow.java   |  16 +-
 .../datastreamer/DataStreamerImpl.java          |   6 +-
 .../cache/mvcc/CacheMvccClusterRestartTest.java | 173 ++++++++
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 434 +++++++++++++++++--
 .../database/FreeListImplSelfTest.java          |   5 +
 .../processors/query/h2/opt/GridH2Row.java      |   5 +
 34 files changed, 1313 insertions(+), 302 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 8a5f0df..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -101,11 +101,6 @@ public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion {
         return 0;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean initialLoad() {
-        return true;
-    }
-
     /**
      * @return Cache ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8432a77..a1535e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1012,7 +1012,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
                 assert mvccVer != null;
 
-                mvccWaitTxs = cctx.offheap().mvccUpdate(this, val, newVer, mvccVer);
+                mvccWaitTxs = cctx.offheap().mvccUpdate(tx.local(),
+                    this,
+                    val,
+                    newVer,
+                    mvccVer);
             }
             else
                 storeValue(val, expireTime, newVer, null);
@@ -1141,6 +1145,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         boolean marked = false;
 
+        GridLongList mvccWaitTxs = null;
+
         synchronized (this) {
             checkObsolete();
 
@@ -1181,7 +1187,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            removeValue();
+            if (cctx.mvccEnabled() && !((IgniteCacheOffheapManagerImpl)cctx.offheap()).IGNITE_FAKE_MVCC_STORAGE) {
+                assert mvccVer != null;
+
+                mvccWaitTxs = cctx.offheap().mvccRemove(tx.local(), this, mvccVer);
+            }
+            else
+                removeValue();
 
             update(null, 0, 0, newVer, true);
 
@@ -1292,7 +1304,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             cctx.config().getInterceptor().onAfterRemove(entry0);
 
         if (valid)
-            return new GridCacheUpdateTxResult(true, updateCntr0, null);
+            return new GridCacheUpdateTxResult(true, updateCntr0, mvccWaitTxs);
         else
             return new GridCacheUpdateTxResult(false);
     }
@@ -2569,6 +2581,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             boolean walEnabled = !cctx.isNear() && cctx.shared().wal() != null;
 
+            // TODO IGNITE-3478: move checks in special initialValue method.
             if (cctx.shared().database().persistenceEnabled()) {
                 unswap(false);
 
@@ -2591,14 +2604,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                if (val != null) {
-                    if (cctx.mvccEnabled())
-                        cctx.offheap().mvccUpdate(this, val, ver, mvccVer);
-                    else
-                        storeValue(val, expTime, ver, null);
+                if (cctx.mvccEnabled()) {
+                    cctx.offheap().mvccInitialValue(this, val, ver, mvccVer);
+
+                    if (val != null)
+                        update(val, expTime, ttl, ver, true);
                 }
+                else {
+                    if (val != null) {
+                        storeValue(val, expTime, ver, null);
 
-                update(val, expTime, ttl, ver, true);
+                        update(val, expTime, ttl, ver, true);
+                    }
+                }
 
                 boolean skipQryNtf = false;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 8967ce8..2c070fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -191,15 +191,54 @@ public interface IgniteCacheOffheapManager {
     /**
      * @param entry Entry.
      * @param val Value.
+     * @param ver Version.
+     * @param mvccVer Mvcc update version.
+     * @return {@code True} if value was inserted.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean mvccInitialValue(
+        GridCacheMapEntry entry,
+        @Nullable CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException;
+
+    /**
+     * @param primary {@code True} if on primary node.
+     * @param entry Entry.
+     * @param val Value.
      * @param ver Cache version.
      * @param mvccVer Mvcc update version.
      * @return Transactions to wait for before finishing current transaction.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public GridLongList mvccUpdate(GridCacheMapEntry entry,
+    @Nullable public GridLongList mvccUpdate(
+        boolean primary,
+        GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
-        MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException;
+
+    /**
+     * @param primary {@code True} if on primary node.
+     * @param entry Entry.
+     * @param mvccVer Mvcc update version.
+     * @return Transactions to wait for before finishing current transaction.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public GridLongList mvccRemove(
+        boolean primary,
+        GridCacheMapEntry entry,
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException;
+
+    /**
+     * @param entry Entry.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void mvccRemoveAll(GridCacheMapEntry entry)
+        throws IgniteCheckedException;
 
     /**
      * @param cctx Cache context.
@@ -498,11 +537,29 @@ public interface IgniteCacheOffheapManager {
          * @param val Value.
          * @param ver Version.
          * @param mvccVer Mvcc version.
+         * @return {@code True} if new value was inserted.
+         * @throws IgniteCheckedException If failed.
+         */
+        boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+        /**
+         * @param cctx Cache context.
+         * @param primary {@code True} if update is executed on primary node.
+         * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param mvccVer Mvcc version.
          * @return List of transactions to wait for.
          * @throws IgniteCheckedException If failed.
          */
         @Nullable GridLongList mvccUpdate(
             GridCacheContext cctx,
+            boolean primary,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
@@ -510,6 +567,27 @@ public interface IgniteCacheOffheapManager {
 
         /**
          * @param cctx Cache context.
+         * @param primary {@code True} if update is executed on primary node.
+         * @param key Key.
+         * @param mvccVer Mvcc version.
+         * @return List of transactions to wait for.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable GridLongList mvccRemove(
+            GridCacheContext cctx,
+            boolean primary,
+            KeyCacheObject key,
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException;
+
+        /**
+         * @param cctx Cache context.
+         * @param key Key.
+         * @throws IgniteCheckedException If failed.
+         */
+        void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException;
+
+        /**
+         * @param cctx Cache context.
          * @param key Key.
          * @throws IgniteCheckedException If failed.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 25f36b2..2bff203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -54,9 +55,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
-import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
@@ -85,6 +86,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
 /**
  *
@@ -380,11 +383,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
-    @Override public GridLongList mvccUpdate(GridCacheMapEntry entry,
+    @Override public boolean mvccInitialValue(
+        GridCacheMapEntry entry,
+        CacheObject val,
+        GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+        return dataStore(entry.localPartition()).mvccInitialValue(
+            entry.context(),
+            entry.key(),
+            val,
+            ver,
+            mvccVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridLongList mvccUpdate(
+        boolean primary,
+        GridCacheMapEntry entry,
         CacheObject val,
         GridCacheVersion ver,
         MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
         return dataStore(entry.localPartition()).mvccUpdate(entry.context(),
+            primary,
             entry.key(),
             val,
             ver,
@@ -392,6 +412,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     }
 
     /** {@inheritDoc} */
+    @Override public GridLongList mvccRemove(
+        boolean primary,
+        GridCacheMapEntry entry,
+        MvccCoordinatorVersion mvccVer
+    ) throws IgniteCheckedException {
+        return dataStore(entry.localPartition()).mvccRemove(entry.context(),
+            primary,
+            entry.key(),
+            mvccVer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mvccRemoveAll(GridCacheMapEntry entry) throws IgniteCheckedException {
+        dataStore(entry.localPartition()).mvccRemoveAll(entry.context(), entry.key());
+    }
+
+    /** {@inheritDoc} */
     @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key, GridDhtLocalPartition part)
         throws IgniteCheckedException {
         dataStore(part).updateIndexes(cctx, key);
@@ -1340,18 +1377,74 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return dataRow;
         }
 
-        private int compare(CacheDataRow row, long crdVer, long mvccCntr) {
-            int cmp = Long.compare(row.mvccCoordinatorVersion(), crdVer);
+        /** {@inheritDoc} */
+        @Override public boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer)
+            throws IgniteCheckedException
+        {
+            assert mvccVer != null;
+
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+            try {
+                assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+
+                int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+                // Make sure value bytes initialized.
+                key.valueBytes(coCtx);
+
+                MvccUpdateRow updateRow;
+
+                if (val != null) {
+                    val.valueBytes(coCtx);
+
+                    updateRow = new MvccUpdateRow(
+                        key,
+                        val,
+                        ver,
+                        mvccVer,
+                        partId,
+                        cacheId);
+                }
+                else {
+                    updateRow = new MvccRemoveRow(
+                        key,
+                        mvccVer,
+                        partId,
+                        cacheId);
+                }
+
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
+
+                rowStore.addRow(updateRow);
 
-            if (cmp != 0)
-                return cmp;
+                boolean old = dataTree.putx(updateRow);
 
-            return Long.compare(row.mvccCounter(), mvccCntr);
+                assert !old;
+
+                if (val != null)
+                    incrementSize(cctx.cacheId());
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+
+            return true;
         }
 
         /** {@inheritDoc} */
         @Override public GridLongList mvccUpdate(
             GridCacheContext cctx,
+            boolean primary,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
@@ -1370,139 +1463,160 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 key.valueBytes(coCtx);
                 val.valueBytes(coCtx);
 
-                if (true) {
-                    MvccUpdateRow updateRow = new MvccUpdateRow(
-                        key,
-                        val,
-                        ver,
-                        mvccVer,
-                        partId,
-                        cacheId);
+                MvccUpdateRow updateRow = new MvccUpdateRow(
+                    key,
+                    val,
+                    ver,
+                    mvccVer,
+                    partId,
+                    cacheId);
 
-                    rowStore.addRow(updateRow);
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
 
-                    assert updateRow.link() != 0 : updateRow;
+                dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                    if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                        updateRow.cacheId(cctx.cacheId());
+                MvccUpdateRow.UpdateResult res = updateRow.updateResult();
 
-                    GridLongList waitTxs = null;
+                if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+                    assert !primary : updateRow;
+                }
+                else {
+                    rowStore.addRow(updateRow);
 
-                    if (mvccVer.initialLoad()) {
-                        boolean old = dataTree.putx(updateRow);
+                    boolean old = dataTree.putx(updateRow);
 
-                        assert !old;
+                    assert !old;
 
+                    if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
                         incrementSize(cctx.cacheId());
-                    }
-                    else {
-                        dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
+                }
 
-                        boolean old = dataTree.putx(updateRow);
+                cleanup(updateRow.cleanupRows(), false);
+
+                return updateRow.activeTransactions();
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridLongList mvccRemove(GridCacheContext cctx,
+            boolean primary,
+            KeyCacheObject key,
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+            assert mvccVer != null;
 
-                        assert !old;
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
-                        if (!updateRow.previousNotNull())
-                            incrementSize(cctx.cacheId());
+            try {
+                int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-                        waitTxs = updateRow.activeTransactions();
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
 
-                        List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+                // Make sure value bytes initialized.
+                key.valueBytes(coCtx);
 
-                        if (cleanupRows != null) {
-                            for (int i = 0; i < cleanupRows.size(); i++) {
-                                CacheSearchRow oldRow = cleanupRows.get(i);
+                MvccRemoveRow updateRow = new MvccRemoveRow(
+                    key,
+                    mvccVer,
+                    partId,
+                    cacheId);
 
-                                assert oldRow.link() != 0L : oldRow;
+                if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
+                    updateRow.cacheId(cctx.cacheId());
 
-                                boolean rmvd = dataTree.removex(oldRow);
+                dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                                assert rmvd;
+                MvccUpdateRow.UpdateResult res = updateRow.updateResult();
 
-                                rowStore.removeRow(oldRow.link());
-                            }
-                        }
-                    }
+                if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+                    assert !primary : updateRow;
 
-                    return waitTxs;
+                    cleanup(updateRow.cleanupRows(), false);
                 }
                 else {
-                    MvccDataRow dataRow = new MvccDataRow(
-                        key,
-                        val,
-                        ver,
-                        partId,
-                        cacheId,
-                        mvccVer.coordinatorVersion(),
-                        mvccVer.counter());
+                    if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+                        decrementSize(cacheId);
 
-                    rowStore.addRow(dataRow);
+                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
 
-                    assert dataRow.link() != 0 : dataRow;
+                    if (rmvRow == null)
+                        rowStore.addRow(updateRow);
+                    else
+                        updateRow.link(rmvRow.link());
 
-                    if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID)
-                        dataRow.cacheId(cctx.cacheId());
+                    assert updateRow.link() != 0L;
 
-                    boolean old = dataTree.putx(dataRow);
+                    boolean old = dataTree.putx(updateRow);
 
                     assert !old;
+                }
 
-                    GridLongList waitTxs = null;
+                return updateRow.activeTransactions();
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
 
-                    if (!mvccVer.initialLoad()) {
-                        MvccLongList activeTxs = mvccVer.activeTransactions();
+        /** {@inheritDoc} */
+        @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
+            key.valueBytes(cctx.cacheObjectContext());
 
-                        // TODO IGNITE-3484: need special method.
-                        GridCursor<CacheDataRow> cur = dataTree.find(
-                            new MvccSearchRow(cacheId, key, mvccVer.coordinatorVersion(), mvccVer.counter() - 1),
-                            new MvccSearchRow(cacheId, key, 1, 1));
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-                        boolean first = true;
+            GridCursor<CacheDataRow> cur = dataTree.find(
+                new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
+                new MvccSearchRow(cacheId, key, 1, 1),
+                CacheDataRowAdapter.RowData.KEY_ONLY);
 
-                        boolean activeTx = false;
+            while (cur.next()) {
+                CacheDataRow row = cur.get();
 
-                        while (cur.next()) {
-                            CacheDataRow oldVal = cur.get();
+                assert row.link() != 0;
 
-                            assert oldVal.link() != 0 : oldVal;
+                boolean rmvd = dataTree.removex(row);
 
-                            if (activeTxs != null && oldVal.mvccCoordinatorVersion() == mvccVer.coordinatorVersion() &&
-                                activeTxs.contains(oldVal.mvccCounter())) {
-                                if (waitTxs == null)
-                                    waitTxs = new GridLongList();
+                assert rmvd;
 
-                                assert oldVal.mvccCounter() != mvccVer.counter();
+                rowStore.removeRow(row.link());
+            }
+        }
 
-                                waitTxs.add(oldVal.mvccCounter());
+        /**
+         * @param cleanupRows Rows to cleanup.
+         * @param findRmv {@code True} if need keep removed row entry.
+         * @return Removed row entry if found.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+            throws IgniteCheckedException {
+            CacheSearchRow rmvRow = null;
 
-                                activeTx = true;
-                            }
+            if (cleanupRows != null) {
+                for (int i = 0; i < cleanupRows.size(); i++) {
+                    CacheSearchRow oldRow = cleanupRows.get(i);
 
-                            if (!activeTx) {
-                                // Should not delete oldest version which is less than cleanup version.
-                                int cmp = compare(oldVal, mvccVer.coordinatorVersion(), mvccVer.cleanupVersion());
+                    assert oldRow.link() != 0L : oldRow;
 
-                                if (cmp <= 0) {
-                                    if (first)
-                                        first = false;
-                                    else {
-                                        boolean rmvd = dataTree.removex(oldVal);
+                    boolean rmvd = dataTree.removex(oldRow);
 
-                                        assert rmvd;
+                    assert rmvd;
 
-                                        rowStore.removeRow(oldVal.link());
-                                    }
-                                }
-                            }
-                        }
+                    if (findRmv &&
+                        rmvRow == null &&
+                        versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
+                        rmvRow = oldRow;
                     }
-
-                    return waitTxs;
+                    else
+                        rowStore.removeRow(oldRow.link());
                 }
             }
-            finally {
-                busyLock.leaveBusy();
-            }
+
+            return rmvRow;
         }
 
         /** {@inheritDoc} */
@@ -1709,26 +1823,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             CacheDataRow row;
 
             if (grp.mvccEnabled()) {
-                if (true) {
-                    MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
-
-                    dataTree.iterate(
-                        searchRow,
-                        new MvccKeyMinVersionBound(cacheId, key),
-                        searchRow // Use the same instance as closure to do not create extra object.
-                    );
+                MvccKeyMaxVersionBound searchRow = new MvccKeyMaxVersionBound(cacheId, key);
 
-                    row = searchRow.row();
-                }
-                else {
-                    GridCursor<CacheDataRow> cur = dataTree.find(new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
-                        new MvccSearchRow(cacheId, key, 1, 1));
+                dataTree.iterate(
+                    searchRow,
+                    new MvccKeyMinVersionBound(cacheId, key),
+                    searchRow // Use the same instance as closure to do not create extra object.
+                );
 
-                    if (cur.next())
-                        row = cur.get();
-                    else
-                        row = null;
-                }
+                row = searchRow.row();
             }
             else
                 row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1781,55 +1884,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            if (true) {
-                MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
-
-                dataTree.iterate(
-                    lower,
-                    new MvccKeyMinVersionBound(cacheId, key),
-                    lower // Use the same instance as closure to do not create extra object.
-                );
-
-                CacheDataRow row = lower.row();
-
-                afterRowFound(row, key);
-
-                return row;
-            }
-            else {
-                GridCursor<CacheDataRow> cur = dataTree.find(
-                    new MvccSearchRow(cacheId, key, ver.coordinatorVersion(), ver.counter()),
-                    new MvccSearchRow(cacheId, key, 1, 1));
-
-                CacheDataRow row = null;
-
-                MvccLongList txs = ver.activeTransactions();
-
-                while (cur.next()) {
-                    CacheDataRow row0 = cur.get();
+            MvccVersionBasedSearchRow lower = new MvccVersionBasedSearchRow(cacheId, key, ver);
 
-                    assert row0.mvccCoordinatorVersion() > 0 : row0;
+            dataTree.iterate(
+                lower,
+                new MvccKeyMinVersionBound(cacheId, key),
+                lower // Use the same instance as closure to do not create extra object.
+            );
 
-                    boolean visible;
-
-                    if (txs != null) {
-                        visible = row0.mvccCoordinatorVersion() != ver.coordinatorVersion()
-                            || !txs.contains(row0.mvccCounter());
-                    }
-                    else
-                        visible = true;
-
-                    if (visible) {
-                        row = row0;
-
-                        break;
-                    }
-                }
+            CacheDataRow row = lower.row();
 
-                assert row == null || key.equals(row.key());
+            afterRowFound(row, key);
 
-                return row;
-            }
+            return row;
         }
 
         /**
@@ -1868,18 +1935,30 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     while (cur.next()) {
                         CacheDataRow row = cur.get();
 
-                        if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
-                            || row.mvccCounter() > ver.counter())
+                        long rowCrdVerMasked = row.mvccCoordinatorVersion();
+
+                        long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+                        if (rowCrdVer > ver.coordinatorVersion())
+                            continue;
+
+                        if (rowCrdVer == ver.coordinatorVersion() && row.mvccCounter() > ver.counter())
                             continue;
 
                         MvccLongList txs = ver.activeTransactions();
 
-                        if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+                        if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
                             continue;
 
                         if (curKey != null && row.key().equals(curKey))
                             continue;
 
+                        if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+                            curKey = row.key();
+
+                            continue;
+                        }
+
                         curRow = row;
 
                         break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 77cc642..a3309a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -585,7 +585,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                         ']');
                 }
 
-                removeValue();
+                if (cctx.mvccEnabled())
+                    cctx.offheap().mvccRemoveAll(this);
+                else
+                    removeValue();
 
                 // Give to GC.
                 update(null, 0L, 0L, ver, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0905917..357fef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -43,6 +43,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
 /**
  * Thread pool for supplying partitions to demanding nodes.
@@ -375,13 +376,24 @@ class GridDhtPartitionSupplier {
                             GridCacheEntryInfo info = grp.mvccEnabled() ?
                                 new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
 
+
                             info.key(row.key());
-                            info.expireTime(row.expireTime());
-                            info.version(row.version());
-                            info.value(row.value());
                             info.cacheId(row.cacheId());
-                            info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
-                            info.mvccCounter(row.mvccCounter());
+
+                            boolean rmvd = false;
+
+                            if (grp.mvccEnabled()) {
+                                info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+                                info.mvccCounter(row.mvccCounter());
+
+                                rmvd = versionForRemovedValue(row.mvccCoordinatorVersion());
+                            }
+
+                            if (!rmvd) {
+                                info.value(row.value());
+                                info.version(row.version());
+                                info.expireTime(row.expireTime());
+                            }
 
                             if (preloadPred == null || preloadPred.apply(info))
                                 s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..6675f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  * Partition supply message.
  */
@@ -217,7 +219,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
     void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
-        assert info.value() != null : info;
+        assert info.value() != null || versionForRemovedValue(info.coordinatorVersion()): info;
 
         // Need to call this method to initialize info properly.
         marshalInfo(info, ctx, cacheObjCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 830d50b..88095ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -560,7 +560,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             MvccCoordinator mvccCrd = firstEvtDiscoCache.mvccCoordinator();
 
             boolean mvccCrdChange = mvccCrd != null &&
-                initialVersion().equals(mvccCrd.topologyVersion());
+                (initialVersion().equals(mvccCrd.topologyVersion()) || activateCluster());
 
             cctx.kernalContext().coordinators().currentCoordinator(mvccCrd);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index b9b8ea1..9f9a7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -66,6 +66,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 
 /**
@@ -86,7 +87,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 
     /** */
     private static final byte MSG_POLICY = SYSTEM_POOL;
-    
+
+    /** */
+    private static final long CRD_VER_MASK = 0x3F_FF_FF_FF_FF_FF_FF_FFL;
+
+    /** */
+    private static final long RMVD_VAL_VER_MASK = 0x80_00_00_00_00_00_00_00L;
+
     /** */
     private volatile MvccCoordinator curCrd;
 
@@ -139,6 +146,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         super(ctx);
     }
 
+    /**
+     * @param crdVer Coordinator version.
+     * @return Coordinator version with removed value flag.
+     */
+    public static long createVersionForRemovedValue(long crdVer) {
+        return crdVer | RMVD_VAL_VER_MASK;
+    }
+
+    /**
+     * @param crdVer Coordinator version with flags.
+     * @return {@code True} if removed value flag is set.
+     */
+    public static boolean versionForRemovedValue(long crdVer) {
+        return (crdVer & RMVD_VAL_VER_MASK) != 0;
+    }
+
+    /**
+     * @param crdVer Coordinator version with flags.
+     * @return Coordinator version.
+     */
+    public static long unmaskCoordinatorVersion(long crdVer) {
+        return crdVer & CRD_VER_MASK;
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         statCntrs = new StatCounter[7];
@@ -199,7 +230,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
      * @param topVer Topology version.
      */
     public void onDiscoveryEvent(int evtType, Collection<ClusterNode> nodes, long topVer) {
-        if (evtType == EVT_NODE_METRICS_UPDATED)
+        if (evtType == EVT_NODE_METRICS_UPDATED || evtType == EVT_DISCOVERY_CUSTOM_EVT)
             return;
 
         MvccCoordinator crd;
@@ -778,7 +809,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     private MvccCoordinatorVersionResponse assignQueryCounter(UUID qryNodeId, long futId) {
         assert crdVer != 0;
 
-        return activeQueries.assignQueryCounter(qryNodeId, futId);
+        MvccCoordinatorVersionResponse res = activeQueries.assignQueryCounter(qryNodeId, futId);
+
+        return res;
 
 //        MvccCoordinatorVersionResponse res = new MvccCoordinatorVersionResponse();
 //
@@ -989,7 +1022,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         log.info("Initialize local node as mvcc coordinator [node=" + ctx.localNodeId() +
             ", topVer=" + topVer + ']');
 
-        crdVer = topVer.topologyVersion();
+        crdVer = topVer.topologyVersion() + ctx.discovery().gridStartTime();
 
         prevCrdQueries.init(activeQueries, discoCache, ctx.discovery());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index a0fd5ee..d80e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -42,9 +42,4 @@ public interface MvccCoordinatorVersion extends Message {
      * @return Counter.
      */
     public long counter();
-
-    /**
-     *
-     */
-    public boolean initialLoad();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
index 20d23ed..c037226 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersionResponse.java
@@ -159,11 +159,6 @@ public class MvccCoordinatorVersionResponse implements MvccCoordinatorMessage, M
     }
 
     /** {@inheritDoc} */
-    @Override public boolean initialLoad() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 57aeaef..b76826f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,4 +54,9 @@ public interface CacheDataRow extends CacheSearchRow {
      * @param key Key.
      */
     public void key(KeyCacheObject key);
+
+    /**
+     * @return {@code True} if this is row for cache remove operation (used only with mvcc).
+     */
+    public boolean removed();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 925431f..d0f2dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -582,6 +582,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return false;
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 5bf53d8..efdc08f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,13 @@ public interface CacheSearchRow {
      */
     public int cacheId();
 
+    /**
+     * @return Mvcc coordinator version.
+     */
     public long mvccCoordinatorVersion();
 
+    /**
+     * @return Mvcc counter.
+     */
     public long mvccCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cb01b6c..e5a9736 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -837,6 +837,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         @Override public long mvccCoordinatorVersion() {
             return 0; // TODO IGNITE-3478.
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean removed() {
+            return false;  // TODO IGNITE-3478.
+        }
     }
 
     /**
@@ -1251,14 +1256,48 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public GridLongList mvccUpdate(GridCacheContext cctx,
+        @Override public boolean mvccInitialValue(
+            GridCacheContext cctx,
+            KeyCacheObject key,
+            @Nullable CacheObject val,
+            GridCacheVersion ver,
+            MvccCoordinatorVersion mvccVer)
+            throws IgniteCheckedException
+        {
+            CacheDataStore delegate = init0(false);
+
+            return delegate.mvccInitialValue(cctx, key, val, ver, mvccVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridLongList mvccUpdate(
+            GridCacheContext cctx,
+            boolean primary,
             KeyCacheObject key,
             CacheObject val,
             GridCacheVersion ver,
             MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
-            return delegate.mvccUpdate(cctx, key, val, ver, mvccVer);
+            return delegate.mvccUpdate(cctx, primary, key, val, ver, mvccVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridLongList mvccRemove(
+            GridCacheContext cctx,
+            boolean primary,
+            KeyCacheObject key,
+            MvccCoordinatorVersion mvccVer) throws IgniteCheckedException {
+            CacheDataStore delegate = init0(false);
+
+            return delegate.mvccRemove(cctx, primary, key, mvccVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
+            CacheDataStore delegate = init0(false);
+
+            delegate.mvccRemoveAll(cctx, key);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 9cc5c62..41d2c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -82,6 +82,8 @@ public class RowStore {
 
         try {
             freeList.insertDataRow(row);
+
+            assert row.link() != 0L;
         }
         finally {
             ctx.database().checkpointReadUnlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
index 3eb62ae..9bd27b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -590,12 +590,19 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
      */
     public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
         KeyCacheObject key = row.key();
-        CacheObject val = row.value();
 
         int keyLen = key.valueBytesLength(null);
+
+        int len = keyLen + (withCacheId ? 4 : 0);
+
+        if (row.removed())
+            return len;
+
+        CacheObject val = row.value();
+
         int valLen = val.valueBytesLength(null);
 
-        return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
+        return len + valLen + CacheVersionIO.size(row.version(), false) + 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
index 628ff38..da012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
@@ -1040,13 +1040,19 @@ public class DataPageIO extends PageIO {
         final int payloadSize
     ) throws IgniteCheckedException {
         final int keySize = row.key().valueBytesLength(null);
-        final int valSize = row.value().valueBytesLength(null);
+
+        boolean rmvd = row.removed();
+
+        final int valSize = rmvd ? 0 : row.value().valueBytesLength(null);
 
         int written = writeFragment(row, buf, rowOff, payloadSize, EntryPart.CACHE_ID, keySize, valSize);
         written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.KEY, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+
+        if (!rmvd) {
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+        }
 
         assert written == payloadSize;
     }
@@ -1414,9 +1420,15 @@ public class DataPageIO extends PageIO {
             }
 
             addr += row.key().putValue(addr);
+
+            if (row.removed())
+                return;
         }
-        else
+        else {
+            assert !row.removed() : row;
+
             addr += (2 + cacheIdSize + row.key().valueBytesLength(null));
+        }
 
         addr += row.value().putValue(addr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 92e6785..d8f911c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -707,12 +707,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
 
                                             GridLongList waitTxs = updRes.mvccWaitTransactions();
 
-                                            if (waitTxs != null) {
-                                                if (this.mvccWaitTxs == null)
-                                                    this.mvccWaitTxs = waitTxs;
-                                                else
-                                                    this.mvccWaitTxs.addAll(waitTxs);
-                                            }
+                                            updateWaitTxs(waitTxs);
                                         }
 
                                         if (nearCached != null && updRes.success()) {
@@ -762,9 +757,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             null,
                                             mvccInfo != null ? mvccInfo.version() : null);
 
-                                        if (updRes.success())
+                                        if (updRes.success()) {
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
 
+                                            GridLongList waitTxs = updRes.mvccWaitTransactions();
+
+                                            updateWaitTxs(waitTxs);
+                                        }
+
                                         if (nearCached != null && updRes.success()) {
                                             nearCached.innerRemove(
                                                 null,
@@ -924,6 +924,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
     }
 
     /**
+     * @param waitTxs Tx ids to wait for.
+     */
+    private void updateWaitTxs(@Nullable GridLongList waitTxs) {
+        if (waitTxs != null) {
+            if (this.mvccWaitTxs == null)
+                this.mvccWaitTxs = waitTxs;
+            else
+                this.mvccWaitTxs.addAll(waitTxs);
+        }
+    }
+
+    /**
      * Commits transaction to transaction manager. Used for one-phase commit transactions only.
      *
      * @param commit If {@code true} commits transaction, otherwise rollbacks.

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a07d012..fc82cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -59,8 +61,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
         }
 
         if (storeMvccVersion()) {
-            assert row.mvccCoordinatorVersion() > 0 : row;
-            assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+            assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
+            assert row.mvccCounter() != COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
@@ -123,7 +125,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert mvcCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index ef08bec..c956d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -61,8 +63,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccCrdVer = row.mvccCoordinatorVersion();
             long mvccUpdateCntr = row.mvccCounter();
 
-            assert mvccCrdVer > 0 : mvccCrdVer;
-            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
+            assert mvccUpdateCntr != COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
@@ -98,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert mvccUpdateCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index f9e1eb3..85624d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -65,17 +67,25 @@ public class CacheDataRowStore extends RowStore {
      * @param cacheId Cache ID.
      * @param hash Hash code.
      * @param link Link.
-     * @param mvccTopVer
-     * @param mvccCntr
+     * @param rowData Required row data.
+     * @param crdVer Mvcc coordinator version.
+     * @param mvccCntr Mvcc counter.
      * @return Search row.
      */
-    MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
+    MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
+        if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
+            if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
+                return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+            else
+                rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
+        }
+
         MvccDataRow dataRow = new MvccDataRow(grp,
             hash,
             link,
             partId,
             rowData,
-            mvccTopVer,
+            crdVer,
             mvccCntr);
 
         initDataRow(dataRow, cacheId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eaeefee..6309153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 
 import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
  *
@@ -160,7 +161,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
 
-        cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
+        cmp = Long.compare(unmaskCoordinatorVersion(row.mvccCoordinatorVersion()),
+            unmaskCoordinatorVersion(mvccCrdVer));
 
         if (cmp != 0)
             return cmp;
@@ -188,10 +190,10 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
             CacheDataRowAdapter.RowData.FULL;
 
         if (grp.mvccEnabled()) {
-            long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
             long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
-            return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
+            return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr);
         }
         else
             return rowStore.dataRow(cacheId, hash, link, x);

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 29bbaaf..d1e90d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -81,6 +81,13 @@ public class DataRow extends CacheDataRowAdapter {
         this.cacheId = cacheId;
     }
 
+    /**
+     *
+     */
+    protected DataRow() {
+        super(0);
+    }
+
     /** {@inheritDoc} */
     @Override public int partition() {
         return part;

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index eb1ee10..916ea93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,12 +18,11 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -46,7 +45,7 @@ public class MvccDataRow extends DataRow {
     MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
         super(grp, hash, link, part, rowData);
 
-        assert crdVer > 0 : crdVer;
+        assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
         assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
         this.crdVer = crdVer;
@@ -54,25 +53,32 @@ public class MvccDataRow extends DataRow {
     }
 
     /**
-     * @param key Key.
-     * @param val Value.
-     * @param ver Version.
+     *
+     */
+    private MvccDataRow() {
+        // No-op.
+    }
+
+    /**
      * @param part Partition.
      * @param cacheId Cache ID.
      * @param crdVer Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
+     * @return Row.
      */
-    public MvccDataRow(KeyCacheObject key,
-        CacheObject val,
-        GridCacheVersion ver,
+    static MvccDataRow removedRowNoKey(
         int part,
         int cacheId,
         long crdVer,
         long mvccCntr) {
-        super(key, val, ver, part, 0L, cacheId);
+        MvccDataRow row = new MvccDataRow();
 
-        this.mvccCntr = mvccCntr;
-        this.crdVer = crdVer;
+        row.cacheId = cacheId;
+        row.part = part;
+        row.crdVer = crdVer;
+        row.mvccCntr = mvccCntr;
+
+        return row;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
index aa9422d..007ac09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -55,7 +57,12 @@ public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeR
         int idx)
         throws IgniteCheckedException
     {
-        resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        if (versionForRemovedValue(rowIo.getMvccCoordinatorVersion(pageAddr, idx)))
+            resRow = null;
+        else
+            resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
 
         return false;  // Stop search.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
new file mode 100644
index 0000000..af11a9d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.createVersionForRemovedValue;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
+/**
+ *
+ */
+public class MvccRemoveRow extends MvccUpdateRow {
+    /**
+     * @param key Key.
+     * @param mvccVer Mvcc version.
+     * @param part Partition.
+     * @param cacheId Cache ID.
+     */
+    public MvccRemoveRow(
+        KeyCacheObject key,
+        MvccCoordinatorVersion mvccVer,
+        int part,
+        int cacheId) {
+        super(key, null, null, mvccVer, part, cacheId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return createVersionForRemovedValue(super.mvccCoordinatorVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long unmaskedCoordinatorVersion() {
+        return unmaskCoordinatorVersion(super.mvccCoordinatorVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccRemoveRow.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index 79544e6..137ca28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -32,12 +33,14 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
 public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
     /** */
-    private Boolean hasPrev;
+    private UpdateResult res;
 
     /** */
     private boolean canCleanup;
@@ -74,8 +77,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     /**
      * @return {@code True} if previous value was non-null.
      */
-    public boolean previousNotNull() {
-        return hasPrev != null && hasPrev;
+    public UpdateResult updateResult() {
+        return res == null ? UpdateResult.PREV_NULL : res;
     }
 
     /**
@@ -92,6 +95,30 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         return cleanupRows;
     }
 
+    /**
+     * @param io IO.
+     * @param pageAddr Page address.
+     * @param idx Item index.
+     * @return Always {@code true}.
+     */
+    private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
+        long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
+        long rowCntr = io.getMvccCounter(pageAddr, idx);
+
+        int cmp = Long.compare(unmaskedCoordinatorVersion(), rowCrdVer);
+
+        if (cmp == 0)
+            cmp = Long.compare(mvccVer.counter(), rowCntr);
+
+        // Can be equals if backup rebalanced value updated on primary.
+        assert cmp >= 0 : "[updCrd=" + unmaskedCoordinatorVersion() +
+            ", updCntr=" + mvccVer.counter() +
+            ", rowCrd=" + rowCrdVer +
+            ", rowCntr=" + rowCntr + ']';
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean apply(BPlusTree<CacheSearchRow, CacheDataRow> tree,
         BPlusIO<CacheSearchRow> io,
@@ -101,16 +128,33 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     {
         RowLinkIO rowIo = (RowLinkIO)io;
 
-        // All previous version should be less then new one.
-        assert mvccVer.coordinatorVersion() >= rowIo.getMvccCoordinatorVersion(pageAddr, idx);
-        assert mvccVer.coordinatorVersion() > rowIo.getMvccCoordinatorVersion(pageAddr, idx) || mvccVer.counter() > rowIo.getMvccCounter(pageAddr, idx);
+        // Assert version grows.
+        assert assertVersion(rowIo, pageAddr, idx);
 
         boolean checkActive = mvccVer.activeTransactions().size() > 0;
 
         boolean txActive = false;
 
+        long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+        long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+        long crdVer = unmaskedCoordinatorVersion();
+
+        if (res == null) {
+            int cmp = Long.compare(crdVer, rowCrdVer);
+
+            if (cmp == 0)
+                cmp = Long.compare(mvccVer.counter(), rowIo.getMvccCounter(pageAddr, idx));
+
+            if (cmp == 0)
+                res = UpdateResult.VERSION_FOUND;
+            else
+                res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+                    UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+        }
+
         // Suppose transactions on previous coordinator versions are done.
-        if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+        if (checkActive && crdVer == rowCrdVer) {
             long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
             if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -123,15 +167,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
             }
         }
 
-        if (hasPrev == null)
-            hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
-
         if (!txActive) {
-            assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+            assert Long.compare(crdVer, rowCrdVer) >= 0;
 
             int cmp;
 
-            if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+            if (crdVer == rowCrdVer)
                 cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
             else
                 cmp = 1;
@@ -141,10 +182,10 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
                 if (canCleanup) {
                     CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
 
-                    assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+                    assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
 
                     // Should not be possible to cleanup active tx.
-                    assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+                    assert rowCrdVer != crdVer
                         || !mvccVer.activeTransactions().contains(row.mvccCounter());
 
                     if (cleanupRows == null)
@@ -160,6 +201,13 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
         return true;
     }
 
+    /**
+     * @return Coordinator version without flags.
+     */
+    protected long unmaskedCoordinatorVersion() {
+        return mvccVer.coordinatorVersion();
+    }
+
     /** {@inheritDoc} */
     @Override public long mvccCoordinatorVersion() {
         return mvccVer.coordinatorVersion();
@@ -174,4 +222,16 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     @Override public String toString() {
         return S.toString(MvccUpdateRow.class, this, "super", super.toString());
     }
+
+    /**
+     *
+     */
+    public enum UpdateResult {
+        /** */
+        VERSION_FOUND,
+        /** */
+        PREV_NULL,
+        /** */
+        PREV_NOT_NULL
+    }
 }


Mime
View raw message