ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/3] ignite git commit: IGNITE-9538: MVCC: update partition counters on prepare phase. This closes #4795.
Date Tue, 25 Sep 2018 13:56:00 GMT
IGNITE-9538: MVCC: update partition counters on prepare phase. This closes #4795.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a82d4aa
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a82d4aa
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a82d4aa

Branch: refs/heads/master
Commit: 3a82d4aa2cae8aa88feeb8bf9505fec41b122417
Parents: eff5751
Author: rkondakov <kondakov87@mail.ru>
Authored: Tue Sep 25 16:55:43 2018 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Tue Sep 25 16:55:43 2018 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   5 +-
 .../processors/cache/GridCacheEntryEx.java      |   6 -
 .../processors/cache/GridCacheMapEntry.java     | 132 ++---
 .../cache/GridCacheMvccEntryInfo.java           |  14 +-
 .../cache/GridCacheSharedContext.java           |   1 -
 .../cache/IgniteCacheOffheapManager.java        |  18 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |  70 +--
 .../cache/PartitionUpdateCounter.java           | 209 +++++++
 .../GridDistributedTxRemoteAdapter.java         |  81 +--
 .../distributed/dht/GridDhtCacheEntry.java      |   5 -
 .../distributed/dht/GridDhtLocalPartition.java  |  38 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../dht/GridDhtTxAbstractEnlistFuture.java      |  62 +--
 .../distributed/dht/GridDhtTxFinishFuture.java  |  13 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  31 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  39 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   8 +-
 .../dht/GridDhtTxPrepareRequest.java            |  37 +-
 .../dht/GridDhtTxQueryEnlistRequest.java        |  32 +-
 .../dht/GridDhtTxQueryFirstEnlistRequest.java   |  45 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   8 +-
 .../dht/NearTxQueryEnlistResultHandler.java     |  25 +-
 .../dht/PartitionUpdateCounters.java            | 123 -----
 .../dht/PartitionUpdateCountersMessage.java     | 259 +++++++++
 .../near/GridNearTxFinishFuture.java            |   1 +
 .../cache/distributed/near/GridNearTxLocal.java |   9 +-
 .../near/GridNearTxQueryEnlistResponse.java     |  35 +-
 .../GridNearTxQueryResultsEnlistFuture.java     |  10 +-
 .../GridNearTxQueryResultsEnlistResponse.java   |  41 +-
 .../cache/mvcc/MvccProcessorImpl.java           |   8 +-
 .../cache/mvcc/MvccUpdateVersionAware.java      |   5 +
 .../processors/cache/mvcc/MvccUtils.java        |  30 +-
 .../cache/persistence/CacheDataRowAdapter.java  |   5 +
 .../persistence/GridCacheOffheapManager.java    |  30 +-
 .../cache/persistence/tree/io/DataPageIO.java   |  15 +-
 .../cache/persistence/tree/io/PageIO.java       |   6 -
 .../cache/transactions/IgniteTxAdapter.java     |   4 +-
 .../cache/transactions/IgniteTxHandler.java     |  16 +-
 .../transactions/IgniteTxLocalAdapter.java      | 120 ++--
 .../cache/transactions/TxCounters.java          | 111 ++--
 .../cache/tree/mvcc/data/MvccDataRow.java       |  22 +-
 .../cache/tree/mvcc/data/MvccUpdateDataRow.java |  37 +-
 .../cache/tree/mvcc/data/MvccUpdateResult.java  |  10 +
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../CacheMvccConfigurationValidationTest.java   |   4 +-
 .../database/CacheFreeListImplSelfTest.java     |   5 +
 .../processors/query/h2/opt/GridH2Row.java      |   4 +
 .../mvcc/CacheMvccSqlUpdateCountersTest.java    | 549 +++++++++++++++++++
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |   2 +
 49 files changed, 1698 insertions(+), 649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 04f6b5f..41c75be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -79,7 +79,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQuer
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryFirstEnlistRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
@@ -103,7 +103,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -1058,7 +1057,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 157:
-                msg = new PartitionUpdateCounters();
+                msg = new PartitionUpdateCountersMessage();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 3a2af5d..1f38eb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -348,7 +348,6 @@ public interface GridCacheEntryEx {
      * @param val Value to set.
      * @param ttl0 TTL.
      * @param topVer Topology version.
-     * @param updateCntr Update counter.
      * @param mvccVer Mvcc version.
      * @param op Cache operation.
      * @param needHistory Whether to collect rows created or affected by the current tx.
@@ -364,7 +363,6 @@ public interface GridCacheEntryEx {
         CacheObject val,
         long ttl0,
         AffinityTopologyVersion topVer,
-        @Nullable Long updateCntr,
         MvccSnapshot mvccVer,
         GridCacheOperation op,
         boolean needHistory,
@@ -374,7 +372,6 @@ public interface GridCacheEntryEx {
      * @param tx Cache transaction.
      * @param affNodeId Partitioned node iD.
      * @param topVer Topology version.
-     * @param updateCntr Update counter.
      * @param mvccVer Mvcc version.
      * @param needHistory Whether to collect rows created or affected by the current tx.
      * @return Tuple containing success flag and old value. If success is {@code false},
@@ -386,7 +383,6 @@ public interface GridCacheEntryEx {
         @Nullable IgniteInternalTx tx,
         UUID affNodeId,
         AffinityTopologyVersion topVer,
-        @Nullable Long updateCntr,
         MvccSnapshot mvccVer,
         boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
@@ -1157,7 +1153,6 @@ public interface GridCacheEntryEx {
      * @param tx Transaction.
      * @param affNodeId Affinity node id.
      * @param topVer Topology version.
-     * @param updateCntr Update counter.
      * @param op Cache operation.
      * @param mvccVer Mvcc version.  @return Update result.
      * @throws IgniteCheckedException, If failed.
@@ -1167,7 +1162,6 @@ public interface GridCacheEntryEx {
         IgniteInternalTx tx,
         UUID affNodeId,
         AffinityTopologyVersion topVer,
-        Long updateCntr,
         List<GridCacheEntryInfo> entries,
         GridCacheOperation op,
         MvccSnapshot mvccVer)

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index fd8b2cd..cdb5656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.UnregisteredBinaryTypeException;
 import org.apache.ignite.internal.UnregisteredClassException;
-import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
@@ -61,11 +60,13 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.ResultType;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
@@ -111,11 +112,11 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DEL
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP;
+import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL;
 import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.NO_KEY;
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.CONCURRENT_UPDATE;
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
-import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.INVOKE_NO_OP;
-import static org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome.REMOVE_NO_VAL;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY;
@@ -1042,7 +1043,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         CacheObject val,
         long ttl0,
         AffinityTopologyVersion topVer,
-        @Nullable Long updateCntr,
         MvccSnapshot mvccVer,
         GridCacheOperation op,
         boolean needHistory,
@@ -1111,7 +1111,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
 
-                lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, updateCntr, mvccVer,
+                lockFut.listen(new MvccUpdateLockListener(tx, this, affNodeId, topVer, val, ttl0, mvccVer,
                     op, needHistory, noCreate, resFut));
 
                 return new GridCacheUpdateTxResult(false, resFut);
@@ -1123,13 +1123,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.deferredDelete() && deletedUnlocked() && !detached())
                 deletedUnlocked(false);
 
-            assert tx.local() && updateCntr == null || !tx.local() && updateCntr != null && updateCntr > 0;
+            if (res.resultType() == ResultType.PREV_NULL) {
+                TxCounters counters = tx.txCounters(true);
 
-            if (tx.local())
-                updateCntr = nextMvccPartitionCounter();
+                if (res.isOwnValueOverridden()) {
+                    if (res.isKeyAbsentBefore())
+                        counters.incrementUpdateCounter(cctx.cacheId(), partition());
+                }
+                else
+                    counters.incrementUpdateCounter(cctx.cacheId(), partition());
+
+                counters.accumulateSizeDelta(cctx.cacheId(), partition(), 1);
+            }
+            else if (res.resultType() == ResultType.PREV_NOT_NULL && !res.isOwnValueOverridden()) {
+                TxCounters counters = tx.txCounters(true);
 
-            if (res.resultType() == ResultType.PREV_NULL)
-                tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), 1);
+                counters.incrementUpdateCounter(cctx.cacheId(), partition());
+            }
 
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                 logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -1141,7 +1151,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         newVer,
                         expireTime,
                         key.partition(),
-                        updateCntr)));
+                        0L)));
 
             update(val, expireTime, ttl, newVer, true);
 
@@ -1157,9 +1167,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
         }
 
-        onUpdateFinished(updateCntr);
+        onUpdateFinished(0L);
 
-        GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, updateCntr, logPtr) :
+        GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
             new GridCacheUpdateTxResult(false, logPtr);
 
         updRes.mvccHistory(res.history());
@@ -1172,7 +1182,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         IgniteInternalTx tx,
         UUID affNodeId,
         AffinityTopologyVersion topVer,
-        @Nullable Long updateCntr,
         MvccSnapshot mvccVer,
         boolean needHistory) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert tx != null;
@@ -1212,7 +1221,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 IgniteInternalFuture<?> lockFut = cctx.kernalContext().coordinators().waitFor(cctx, lockVer);
 
-                lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, updateCntr, mvccVer, needHistory,
+                lockFut.listen(new MvccRemoveLockListener(tx, this, affNodeId, topVer, mvccVer, needHistory,
                     resFut));
 
                 return new GridCacheUpdateTxResult(false, resFut);
@@ -1221,16 +1230,21 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (cctx.deferredDelete() && deletedUnlocked() && !detached())
                 deletedUnlocked(false);
 
-            assert tx.local() && updateCntr == null || !tx.local() && updateCntr != null && updateCntr > 0;
+            if (res.resultType() == ResultType.PREV_NOT_NULL) {
+                TxCounters counters = tx.txCounters(true);
 
-            if (tx.local())
-                updateCntr = nextMvccPartitionCounter();
+                if (res.isOwnValueOverridden()) {
+                    if (res.isKeyAbsentBefore()) // Do not count own update removal.
+                        counters.decrementUpdateCounter(cctx.cacheId(), partition());
+                }
+                else
+                    counters.incrementUpdateCounter(cctx.cacheId(), partition());
 
-            if (res.resultType() == ResultType.PREV_NOT_NULL)
-                tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), partition(), -1);
+                counters.accumulateSizeDelta(cctx.cacheId(), partition(), -1);
+            }
 
             if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
-                logPtr = logTxUpdate(tx, null, 0, updateCntr);
+                logPtr = logTxUpdate(tx, null, 0, 0L);
 
             update(null, 0, 0, newVer, true);
 
@@ -1246,9 +1260,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
         }
 
-        onUpdateFinished(updateCntr);
+        onUpdateFinished(0L);
 
-        GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, updateCntr, logPtr) :
+        GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr) :
             new GridCacheUpdateTxResult(false, logPtr);
 
         updRes.mvccHistory(res.history());
@@ -3420,14 +3434,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         return 0;
     }
 
-    /**
-     * @return Next mvcc update counter.
-     */
-    protected long nextMvccPartitionCounter() {
-        return 0;
-    }
-
-
     /** {@inheritDoc} */
     @Override public GridCacheVersionedEntryEx versionedEntry(final boolean keepBinary)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
@@ -4929,9 +4935,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private final MvccSnapshot mvccVer;
 
         /** */
-        private final Long updateCntr;
-
-        /** */
         private final boolean needHistory;
 
         /** */
@@ -4945,7 +4948,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             GridCacheMapEntry entry,
             UUID affNodeId,
             AffinityTopologyVersion topVer,
-            Long updateCntr,
             MvccSnapshot mvccVer,
             boolean needHistory,
             GridFutureAdapter<GridCacheUpdateTxResult> resFut) {
@@ -4954,7 +4956,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             this.topVer = topVer;
             this.affNodeId = affNodeId;
             this.mvccVer = mvccVer;
-            this.updateCntr = updateCntr;
             this.needHistory = needHistory;
             this.resFut = resFut;
         }
@@ -4962,7 +4963,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /** {@inheritDoc} */
         @Override public void apply(IgniteInternalFuture lockFut) {
             WALPointer logPtr = null;
-            long updateCntr0;
             boolean valid;
 
             GridCacheContext cctx = entry.context();
@@ -5019,15 +5019,18 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 if (cctx.deferredDelete() && entry.deletedUnlocked() && !entry.detached())
                     entry.deletedUnlocked(false);
 
-                assert tx.local() && updateCntr == null || !tx.local() && updateCntr != null && updateCntr > 0;
+                if (res.resultType() == ResultType.PREV_NOT_NULL) {
+                    TxCounters counters = tx.txCounters(true);
 
-                updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : updateCntr;
-
-                if (updateCntr != null && updateCntr != 0)
-                    updateCntr0 = updateCntr;
+                    if (res.isOwnValueOverridden()) {
+                        if (res.isKeyAbsentBefore()) // Do not count own update removal.
+                            counters.decrementUpdateCounter(cctx.cacheId(), entry.partition());
+                    }
+                    else
+                        counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
 
-                if (res.resultType() == ResultType.PREV_NOT_NULL)
-                    tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1);
+                    counters.accumulateSizeDelta(cctx.cacheId(), entry.partition(), -1);
+                }
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -5039,7 +5042,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             tx.writeVersion(),
                             0,
                             entry.key().partition(),
-                            updateCntr0)));
+                            0)));
 
                 entry.update(null, 0, 0, newVer, true);
 
@@ -5060,9 +5063,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            entry.onUpdateFinished(updateCntr0);
+            entry.onUpdateFinished(0L);
 
-            GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr)
+            GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr)
                 : new GridCacheUpdateTxResult(false, logPtr);
 
             updRes.mvccHistory(res.history());
@@ -5189,9 +5192,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         private final long ttl;
 
         /** */
-        private final Long updateCntr;
-
-        /** */
         private final MvccSnapshot mvccVer;
 
         /** */
@@ -5216,7 +5216,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             AffinityTopologyVersion topVer,
             CacheObject val,
             long ttl,
-            Long updateCntr,
             MvccSnapshot mvccVer,
             GridCacheOperation op,
             boolean needHistory,
@@ -5228,7 +5227,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             this.topVer = topVer;
             this.val = val;
             this.ttl = ttl;
-            this.updateCntr = updateCntr;
             this.mvccVer = mvccVer;
             this.op = op;
             this.needHistory = needHistory;
@@ -5239,7 +5237,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         /** {@inheritDoc} */
         @Override public void apply(IgniteInternalFuture lockFut) {
             WALPointer logPtr = null;
-            long updateCntr0;
             boolean valid;
 
             GridCacheContext cctx = entry.context();
@@ -5312,12 +5309,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 if (cctx.deferredDelete() && entry.deletedUnlocked() && !entry.detached())
                     entry.deletedUnlocked(false);
 
-                assert tx.local() && updateCntr == null || !tx.local() && updateCntr != null && updateCntr > 0;
+                if (res.resultType() == ResultType.PREV_NULL) {
+                    TxCounters counters = tx.txCounters(true);
+
+                    if (res.isOwnValueOverridden()) {
+                        if (res.isKeyAbsentBefore())
+                            counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
+                    }
+                    else
+                        counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
 
-                updateCntr0 = tx.local() ? entry.nextMvccPartitionCounter() : updateCntr;
+                    counters.accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1);
+                }
+                else if (res.resultType() == ResultType.PREV_NOT_NULL && !res.isOwnValueOverridden()) {
+                    TxCounters counters = tx.txCounters(true);
 
-                if (res.resultType() == ResultType.PREV_NULL)
-                    tx.txCounters(true).accumulateSizeDelta(cctx.cacheId(), entry.partition(), 1);
+                    counters.incrementUpdateCounter(cctx.cacheId(), entry.partition());
+                }
 
                 if (cctx.group().persistenceEnabled() && cctx.group().walEnabled())
                     logPtr = cctx.shared().wal().log(new DataRecord(new DataEntry(
@@ -5329,7 +5337,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             newVer,
                             expireTime,
                             entry.key().partition(),
-                            updateCntr0)));
+                            0L)));
 
                 entry.update(val, expireTime, ttl, newVer, true);
 
@@ -5350,9 +5358,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
-            entry.onUpdateFinished(updateCntr0);
+            entry.onUpdateFinished(0L);
 
-            GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, updateCntr0, logPtr)
+            GridCacheUpdateTxResult updRes = valid ? new GridCacheUpdateTxResult(true, 0L, logPtr)
                 : new GridCacheUpdateTxResult(false, logPtr);
 
             updRes.mvccHistory(res.history());
@@ -6451,12 +6459,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         IgniteInternalTx tx,
         UUID affNodeId,
         AffinityTopologyVersion topVer,
-        Long updateCntr,
         List<GridCacheEntryInfo> entries,
         GridCacheOperation op,
         MvccSnapshot mvccVer)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        assert updateCntr != null && updateCntr > 0 && !tx.local();
 
         WALPointer logPtr = null;
 
@@ -6506,7 +6512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     ver,
                     CU.EXPIRE_TIME_ETERNAL,
                     key.partition(),
-                    updateCntr)));
+                    0L)));
 
             update(val, expireTime, ttl, ver, true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
index a80ddc0..bd4146e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
@@ -24,8 +24,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
-import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_BIT_OFF;
-import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_MASK;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_HINTS_BIT_OFF;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_HINTS_MASK;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_MASK;
 
 /**
  *
@@ -64,7 +65,7 @@ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo implements MvccVe
 
     /** {@inheritDoc} */
     @Override public int newMvccOperationCounter() {
-        return newMvccOpCntr & ~MVCC_HINTS_MASK;
+        return newMvccOpCntr & ~MVCC_OP_COUNTER_MASK;
     }
 
     /** {@inheritDoc} */
@@ -84,7 +85,7 @@ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo implements MvccVe
 
     /** {@inheritDoc} */
     @Override public int mvccOperationCounter() {
-        return mvccOpCntr & ~MVCC_HINTS_MASK;
+        return mvccOpCntr & ~MVCC_OP_COUNTER_MASK;
     }
 
     /** {@inheritDoc} */
@@ -121,6 +122,11 @@ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo implements MvccVe
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isKeyAbsentBefore() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index bfe0001..a4dddc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionsEvictManager;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index fdf42fe..c30f9e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -638,19 +638,25 @@ public interface IgniteCacheOffheapManager {
         void updateCounter(long val);
 
         /**
-         * @return Next update counter.
+         * Updates counters from start value by delta value.
+         *
+         * @param start Start.
+         * @param delta Delta
          */
-        public long nextUpdateCounter();
+        void updateCounter(long start, long delta);
 
         /**
-         * @return Next mvcc update counter.
+         * @return Next update counter.
          */
-        long nextMvccUpdateCounter();
+        public long nextUpdateCounter();
 
         /**
-         * @return Current mvcc update counter value.
+         * Returns current value and updates counter by delta.
+         *
+         * @param delta Delta.
+         * @return Current value.
          */
-        long mvccUpdateCounter();
+        public long getAndIncrementUpdateCounter(long delta);
 
         /**
          * @return Initial update counter.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 9919240..dd4b452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -110,6 +110,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.INITIAL_VERSION;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_HINTS_BIT_OFF;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_MASK;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compare;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compareNewVersion;
@@ -119,8 +121,6 @@ import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.state;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.unexpectedStateException;
 import static org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.EMPTY_CURSOR;
 import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE;
-import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_BIT_OFF;
-import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.MVCC_HINTS_MASK;
 import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP;
 import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
 
@@ -1401,14 +1401,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         private final CacheDataTree dataTree;
 
         /** Update counter. */
-        protected final AtomicLong cntr = new AtomicLong();
-
-        /**
-         * Mvcc update counter. This counter is used for an mvcc-style entries updates where this counter is
-         * incremented on each entry write (which happens before commit), but main update counter is updated
-         * on commit phase only.
-         */
-        protected final AtomicLong mvccUpdCntr = new AtomicLong();
+        protected final PartitionUpdateCounter pCntr = new PartitionUpdateCounter(log);
 
         /** Partition size. */
         private final AtomicLong storageSize = new AtomicLong();
@@ -1416,9 +1409,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         /** */
         private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>();
 
-        /** Initial update counter. */
-        protected long initCntr;
-
         /** Mvcc remove handler. */
         private final PageHandler<MvccVersion, Boolean> mvccUpdateMarker = new MvccMarkUpdatedHandler();
 
@@ -1514,35 +1504,37 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
         /** {@inheritDoc} */
         @Override public long nextUpdateCounter() {
-            return cntr.incrementAndGet();
+            return pCntr.next();
         }
 
         /** {@inheritDoc} */
-        @Override public long updateCounter() {
-            return cntr.get();
+        @Override public long initialUpdateCounter() {
+            return pCntr.initial();
         }
 
         /** {@inheritDoc} */
-        @Override public void updateCounter(long val) {
-            while (true) {
-                long val0 = cntr.get();
+        @Override public void updateInitialCounter(long cntr) {
+            pCntr.updateInitial(cntr);
+        }
 
-                if (val0 >= val)
-                    break;
+        /** {@inheritDoc} */
+        @Override public long getAndIncrementUpdateCounter(long delta) {
+            return pCntr.getAndAdd(delta);
+        }
 
-                if (cntr.compareAndSet(val0, val))
-                    break;
-            }
+        /** {@inheritDoc} */
+        @Override public long updateCounter() {
+            return pCntr.get();
         }
 
         /** {@inheritDoc} */
-        @Override public long nextMvccUpdateCounter() {
-            return mvccUpdCntr.incrementAndGet();
+        @Override public void updateCounter(long val) {
+            pCntr.update(val);
         }
 
         /** {@inheritDoc} */
-        @Override public long mvccUpdateCounter() {
-            return mvccUpdCntr.get();
+        @Override public void updateCounter(long start, long delta) {
+            pCntr.update(start, delta);
         }
 
         /** {@inheritDoc} */
@@ -2771,29 +2763,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public long initialUpdateCounter() {
-            return initCntr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void updateInitialCounter(long cntr) {
-            if (updateCounter() < cntr)
-                updateCounter(cntr);
-
-            initCntr = cntr;
-        }
-
-        /** {@inheritDoc} */
         @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) {
             rowStore().setRowCacheCleaner(rowCacheCleaner);
         }
 
         /** {@inheritDoc} */
         @Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) {
-            initCntr = updCntr;
-            storageSize.set(size);
+            pCntr.init(updCntr);
 
-            cntr.set(updCntr);
+            storageSize.set(size);
 
             if (cacheSizes != null) {
                 for (Map.Entry<Integer, Long> e : cacheSizes.entrySet())
@@ -3143,13 +3121,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             long crd = iox.mvccCoordinator(pageAddr, offset);
             long cntr = iox.mvccCounter(pageAddr, offset);
             int opCntrAndHint = iox.mvccOperationCounter(pageAddr, offset);
-            int opCntr = opCntrAndHint & ~MVCC_HINTS_MASK;
+            int opCntr = opCntrAndHint & ~MVCC_OP_COUNTER_MASK;
             byte txState = (byte)(opCntrAndHint >>> MVCC_HINTS_BIT_OFF);
 
             long newCrd = iox.newMvccCoordinator(pageAddr, offset);
             long newCntr = iox.newMvccCounter(pageAddr, offset);
             int newOpCntrAndHint = iox.newMvccOperationCounter(pageAddr, offset);
-            int newOpCntr = newOpCntrAndHint & ~MVCC_HINTS_MASK;
+            int newOpCntr = newOpCntrAndHint & ~MVCC_OP_COUNTER_MASK;
             byte newTxState = (byte)(newOpCntrAndHint >>> MVCC_HINTS_BIT_OFF);
 
             assert crd == newRow.mvccCoordinatorVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
new file mode 100644
index 0000000..b5960ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Partition update counter with MVCC delta updates capabilities.
+ */
+public class PartitionUpdateCounter {
+    /** */
+    private IgniteLogger log;
+
+    /** Queue of counter update tasks*/
+    private final Queue<Item> queue = new PriorityQueue<>();
+
+    /** Counter. */
+    private final AtomicLong cntr = new AtomicLong();
+
+    /** Initial counter. */
+    private long initCntr;
+
+    /**
+     * @param log Logger.
+     */
+    PartitionUpdateCounter(IgniteLogger log) {
+        this.log = log;
+    }
+
+    /**
+     * Sets init counter.
+     *
+     * @param updateCntr Init counter valus.
+     */
+    public void init(long updateCntr) {
+        initCntr = updateCntr;
+
+        cntr.set(updateCntr);
+    }
+
+    /**
+     * @return Initial counter value.
+     */
+    public long initial() {
+        return initCntr;
+    }
+
+    /**
+     * @return Current update counter value.
+     */
+    public long get() {
+        return cntr.get();
+    }
+
+    /**
+     * Adds delta to current counter value.
+     *
+     * @param delta Delta.
+     * @return Value before add.
+     */
+    public long getAndAdd(long delta) {
+        return cntr.getAndAdd(delta);
+    }
+
+    /**
+     * @return Next update counter.
+     */
+    public long next() {
+        return cntr.incrementAndGet();
+    }
+
+    /**
+     * Sets value to update counter,
+     *
+     * @param val Values.
+     */
+    public void update(long val) {
+        while (true) {
+            long val0 = cntr.get();
+
+            if (val0 >= val)
+                break;
+
+            if (cntr.compareAndSet(val0, val))
+                break;
+        }
+    }
+
+    /**
+     * Updates counter by delta from start position.
+     *
+     * @param start Start.
+     * @param delta Delta.
+     */
+    public synchronized void update(long start, long delta) {
+        long cur = cntr.get(), next;
+
+        if (cur > start) {
+            log.warning("Stale update counter task [cur=" + cur + ", start=" + start + ", delta=" + delta + ']');
+
+            return;
+        }
+
+        if (cur < start) {
+            // backup node with gaps
+            offer(new Item(start, delta));
+
+            return;
+        }
+
+        while (true) {
+            boolean res = cntr.compareAndSet(cur, next = start + delta);
+
+            assert res;
+
+            Item peek = peek();
+
+            if (peek == null || peek.start != next)
+                return;
+
+            Item item = poll();
+
+            assert peek == item;
+
+            start = item.start;
+            delta = item.delta;
+            cur = next;
+        }
+    }
+
+    /**
+     * @param cntr Sets initial counter.
+     */
+    public void updateInitial(long cntr) {
+        if (get() < cntr)
+            update(cntr);
+
+        initCntr = cntr;
+    }
+
+    /**
+     * @return Retrieves the minimum update counter task from queue.
+     */
+    private Item poll() {
+        return queue.poll();
+    }
+
+    /**
+     * @return Checks the minimum update counter task from queue.
+     */
+    private Item peek() {
+        return queue.peek();
+    }
+
+    /**
+     * @param item Adds update task to priority queue.
+     */
+    private void offer(Item item) {
+        queue.offer(item);
+    }
+
+    /**
+     * Update counter task. Update from start value by delta value.
+     */
+    private static class Item implements Comparable<Item> {
+        /** */
+        private final long start;
+
+        /** */
+        private final long delta;
+
+        /**
+         * @param start Start value.
+         * @param delta Delta value.
+         */
+        private Item(long start, long delta) {
+            this.start = start;
+            this.delta = delta;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull Item o) {
+            int cmp = Long.compare(this.start, o.start);
+
+            assert cmp != 0;
+
+            return cmp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 8e96ae2..1eccd6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -52,7 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCounters;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -809,7 +809,14 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                 }
                             }
 
-                            applyTxCounters();
+                            // Apply cache size deltas.
+                            applyTxSizes();
+
+                            TxCounters txCntrs = txCounters(false);
+
+                            // Apply update counters.
+                            if (txCntrs != null)
+                                applyPartitionsUpdatesCounters(txCntrs.updateCounters());
 
                             if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
                                 // Set new update counters for data entries received from persisted tx entries.
@@ -848,38 +855,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override protected void applyTxCounters() {
-        super.applyTxCounters();
-
-        TxCounters txCntrs = txCounters(false);
-
-        if (txCntrs == null)
-            return;
-
-        Map<Integer, PartitionUpdateCounters> updCntrs = txCntrs.updateCounters();
-
-        for (Map.Entry<Integer, PartitionUpdateCounters> entry : updCntrs.entrySet()) {
-            int cacheId = entry.getKey();
-
-            GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology();
-
-            Map<Integer, Long> cacheUpdCntrs = entry.getValue().updateCounters();
-
-            assert cacheUpdCntrs != null;
-
-            for (Map.Entry<Integer, Long> e : cacheUpdCntrs.entrySet()) {
-                long updCntr = e.getValue();
-
-                GridDhtLocalPartition dhtPart = top.localPartition(e.getKey());
-
-                assert dhtPart != null && updCntr > 0;
-
-                dhtPart.updateCounter(updCntr);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public final void commitRemoteTx() throws IgniteCheckedException {
         if (optimistic())
             state(PREPARED);
@@ -965,6 +940,11 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
             if (state(ROLLING_BACK) || state() == UNKNOWN) {
                 cctx.tm().rollbackTx(this, false, skipCompletedVers);
 
+                TxCounters counters = txCounters(false);
+
+                if (counters != null)
+                    applyPartitionsUpdatesCounters(counters.updateCounters());
+
                 state(ROLLED_BACK);
             }
         }
@@ -1029,6 +1009,39 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
+    /**
+     * Applies partition counters updates for mvcc transactions.
+     *
+     * @param counters Counters values to be updated.
+     */
+    private void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) {
+        if (counters == null)
+            return;
+
+        int cacheId = CU.UNDEFINED_CACHE_ID;
+        GridDhtPartitionTopology top = null;
+
+        for (PartitionUpdateCountersMessage counter : counters) {
+            if (counter.cacheId() != cacheId) {
+                GridCacheContext ctx0 = cctx.cacheContext(cacheId = counter.cacheId());
+
+                assert ctx0.mvccEnabled();
+
+                top = ctx0.topology();
+            }
+
+            assert top != null;
+
+            for (int i = 0; i < counter.size(); i++) {
+                GridDhtLocalPartition part = top.localPartition(counter.partition(i));
+
+                assert part != null;
+
+                part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 343e418..96f5c9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -102,11 +102,6 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
-    @Override protected long nextMvccPartitionCounter() {
-        return locPart.nextMvccUpdateCounter();
-    }
-
-    /** {@inheritDoc} */
     @Override public int memorySize() throws IgniteCheckedException {
         int rdrsOverhead;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index f56df00..a5a12a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -969,45 +969,51 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * @return Current update index.
+     * @return Current update counter.
      */
     public long updateCounter() {
         return store.updateCounter();
     }
 
     /**
-     * @return Current mvcc update counter value.
+     * @param val Update counter value.
      */
-    public long mvccUpdateCounter() {
-        return store.mvccUpdateCounter();
+    public void updateCounter(long val) {
+        store.updateCounter(val);
     }
 
     /**
-     * @return Next mvcc update counter.
+     * @return Initial update counter.
      */
-    public long nextMvccUpdateCounter() {
-        return store.nextMvccUpdateCounter();
+    public long initialUpdateCounter() {
+        return store.initialUpdateCounter();
     }
 
     /**
-     * @return Initial update counter.
+     * @param val Initial update counter value.
      */
-    public long initialUpdateCounter() {
-        return store.initialUpdateCounter();
+    public void initialUpdateCounter(long val) {
+        store.updateInitialCounter(val);
     }
 
     /**
-     * @param val Update index value.
+     * Updates MVCC cache update counter on primary node.
+     *
+     * @param delta Value to be added to update counter.
+     * @return Update counter value before update.
      */
-    public void updateCounter(long val) {
-        store.updateCounter(val);
+    public long getAndIncrementUpdateCounter(long delta) {
+        return store.getAndIncrementUpdateCounter(delta);
     }
 
     /**
-     * @param val Initial update index value.
+     * Updates MVCC cache update counter on backup node.
+     *
+     * @param start Start position
+     * @param delta Delta.
      */
-    public void initialUpdateCounter(long val) {
-        store.updateInitialCounter(val);
+    public void updateCounter(long start, long delta) {
+         store.updateCounter(start, delta);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index be1c7e2..f6df80b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -2205,7 +2205,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
             MvccSnapshot snapshot = new MvccSnapshotWithoutTxs(s0.coordinatorVersion(), s0.counter(),
                 req.operationCounter(), s0.cleanupVersion());
 
-            tx.mvccEnlistBatch(ctx, req.op(), req.keys(), req.values(), snapshot, req.updateCounters());
+            tx.mvccEnlistBatch(ctx, req.op(), req.keys(), req.values(), snapshot);
 
             GridDhtTxQueryEnlistResponse res = new GridDhtTxQueryEnlistResponse(req.cacheId(),
                 req.dhtFutureId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index a3471c7..2b34a41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -135,6 +134,9 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     /** Processed entries count. */
     protected long cnt;
 
+    /** New DHT nodes. */
+    protected Set<UUID> newDhtNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
     /** Near node ID. */
     protected final UUID nearNodeId;
 
@@ -186,9 +188,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     /** Moving partitions. */
     private Map<Integer, Boolean> movingParts;
 
-    /** Update counters to be sent to the near node in case it is a backup node also. */
-    protected GridLongList nearUpdCntrs;
-
     /**
      * @param nearNodeId Near node ID.
      * @param nearLockVer Near lock version.
@@ -388,7 +387,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                                         tx,
                                         cctx.localNodeId(),
                                         topVer,
-                                        null,
                                         mvccSnapshot,
                                         isMoving(key.partition()));
 
@@ -403,7 +401,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                                         val,
                                         0,
                                         topVer,
-                                        null,
                                         mvccSnapshot,
                                         op.cacheOperation(),
                                         isMoving(key.partition()),
@@ -575,7 +572,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         cnt++;
 
         if (op != EnlistOperation.LOCK)
-            addToBatch(entry.key(), val, updRes.mvccHistory(), updRes.updateCounter(), entry.context().cacheId());
+            addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId());
     }
 
     /**
@@ -585,9 +582,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
      * @param key Key.
      * @param val Value.
      * @param hist History rows.
-     * @param updCntr Update counter.
      */
-    private void addToBatch(KeyCacheObject key, CacheObject val, List<MvccLinkAwareSearchRow> hist, long updCntr,
+    private void addToBatch(KeyCacheObject key, CacheObject val, List<MvccLinkAwareSearchRow> hist,
         int cacheId) throws IgniteCheckedException {
         List<ClusterNode> backups = backupNodes(key);
 
@@ -609,15 +605,10 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                 updateMappings(node);
 
                 if (newRemoteTx(node))
-                    tx.addLockTransactionNode(node);
+                    addNewRemoteTxNode(node);
 
                 hasNearNodeUpdates = true;
 
-                if (nearUpdCntrs == null)
-                    nearUpdCntrs = new GridLongList();
-
-                nearUpdCntrs.add(updCntr);
-
                 continue;
             }
 
@@ -637,7 +628,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                 hist0 = fetchHistoryInfo(key, hist);
             }
 
-            batch.add(key, moving ? hist0 : val, updCntr);
+            batch.add(key, moving ? hist0 : val);
 
             if (batch.size() == BATCH_SIZE) {
                 assert batches != null;
@@ -705,6 +696,17 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
     }
 
     /**
+     * Add new involved DHT node.
+     *
+     * @param node Node.
+     */
+    private void addNewRemoteTxNode(ClusterNode node) {
+        tx.addLockTransactionNode(node);
+
+        newDhtNodes.add(node.id());
+    }
+
+    /**
      * Checks if there free space in batches or free slot in in-flight batches is available for the given key.
      *
      * @param key Key.
@@ -752,7 +754,7 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         GridDhtTxQueryEnlistRequest req;
 
         if (newRemoteTx(node)) {
-            tx.addLockTransactionNode(node);
+            addNewRemoteTxNode(node);
 
             // If this is a first request to this node, send full info.
             req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(),
@@ -768,8 +770,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                 it.operation(),
                 FIRST_BATCH_ID,
                 batch.keys(),
-                batch.values(),
-                batch.updateCounters());
+                batch.values()
+            );
         }
         else {
             // Send only keys, values, LockVersion and batchId if this is not a first request to this backup.
@@ -780,8 +782,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                 ++batchIdCntr,
                 mvccSnapshot.operationCounter(),
                 batch.keys(),
-                batch.values(),
-                batch.updateCounters());
+                batch.values()
+            );
         }
 
         ConcurrentMap<Integer, Batch> pending0 = null;
@@ -1036,9 +1038,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
          */
         private List<Message> vals;
 
-        /** Update counters. */
-        private GridLongList updCntrs;
-
         /**
          * @param node Cluster node.
          */
@@ -1059,9 +1058,8 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
          * @param key Key.
          * @param val Value or preload entries collection.
          */
-        public void add(KeyCacheObject key, Message val, long updCntr) {
+        public void add(KeyCacheObject key, Message val) {
             assert val == null || val instanceof CacheObject || val instanceof CacheEntryInfoCollection;
-            assert updCntr > 0;
 
             if (keys == null)
                 keys = new ArrayList<>();
@@ -1075,11 +1073,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
                 vals.add(val);
             }
 
-            if (updCntrs == null)
-                updCntrs = new GridLongList();
-
-            updCntrs.add(updCntr);
-
             assert (vals == null) || keys.size() == vals.size();
         }
 
@@ -1103,13 +1096,6 @@ public abstract class GridDhtTxAbstractEnlistFuture extends GridCacheFutureAdapt
         public List<Message> values() {
             return vals;
         }
-
-        /**
-         * @return Update counters.
-         */
-        public GridLongList updateCounters() {
-            return updCntrs;
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 4c72e6f..9283939 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -371,7 +371,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 tx.activeCachesDeploymentEnabled(),
                 false,
                 false,
-                tx.mvccSnapshot());
+                tx.mvccSnapshot(),
+                tx.filterUpdateCountersForBackupNode(n));
 
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
@@ -457,11 +458,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
             for (IgniteTxEntry e : dhtMapping.entries())
                 updCntrs.add(e.updateCounter());
 
-            Map<Integer, PartitionUpdateCounters> updCntrsForNode = null;
-
-            if (dhtMapping.queryUpdate() && commit)
-                updCntrsForNode = tx.filterUpdateCountersForBackupNode(n);
-
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
                 futId,
@@ -489,7 +485,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 false,
                 false,
                 mvccSnapshot,
-                updCntrsForNode);
+                commit ? null : tx.filterUpdateCountersForBackupNode(n));
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 
@@ -559,7 +555,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                     tx.activeCachesDeploymentEnabled(),
                     false,
                     false,
-                    mvccSnapshot);
+                    mvccSnapshot,
+                    null);
 
                 req.writeVersion(tx.writeVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 8e9ece6..61896b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,11 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -74,8 +72,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     private MvccSnapshot mvccSnapshot;
 
     /** */
-    @GridDirectMap(keyType = Integer.class, valueType = PartitionUpdateCounters.class)
-    private Map<Integer, PartitionUpdateCounters> updCntrs;
+    @GridDirectCollection(PartitionUpdateCountersMessage.class)
+    private Collection<PartitionUpdateCountersMessage> updCntrs;
 
     /**
      * Empty constructor required for {@link Externalizable}.
@@ -107,6 +105,10 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
      * @param addDepInfo Deployment info flag.
+     * @param retVal Need return value
+     * @param waitRemoteTxs Wait remote transactions flag
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param updCntrs Update counters for mvcc Tx.
      */
     public GridDhtTxFinishRequest(
         UUID nearNodeId,
@@ -133,7 +135,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean addDepInfo,
         boolean retVal,
         boolean waitRemoteTxs,
-        MvccSnapshot mvccSnapshot
+        MvccSnapshot mvccSnapshot,
+        Collection<PartitionUpdateCountersMessage> updCntrs
     ) {
         super(
             xidVer,
@@ -163,6 +166,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         this.isolation = isolation;
         this.miniId = miniId;
         this.mvccSnapshot = mvccSnapshot;
+        this.updCntrs = updCntrs;
 
         needReturnValue(retVal);
         waitRemoteTransactions(waitRemoteTxs);
@@ -193,6 +197,10 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param taskNameHash Task name hash.
      * @param updateIdxs Partition update idxs.
      * @param addDepInfo Deployment info flag.
+     * @param retVal Need return value
+     * @param waitRemoteTxs Wait remote transactions flag
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param updCntrs Update counters for mvcc Tx.
      */
     public GridDhtTxFinishRequest(
         UUID nearNodeId,
@@ -221,7 +229,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         boolean retVal,
         boolean waitRemoteTxs,
         MvccSnapshot mvccSnapshot,
-        Map<Integer, PartitionUpdateCounters> updCntrs
+        Collection<PartitionUpdateCountersMessage> updCntrs
     ) {
         this(nearNodeId,
             futId,
@@ -247,7 +255,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
             addDepInfo,
             retVal,
             waitRemoteTxs,
-            mvccSnapshot);
+            mvccSnapshot,
+            updCntrs);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -255,8 +264,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
             for (Long idx : updateIdxs)
                 partUpdateCnt.add(idx);
         }
-
-        this.updCntrs = updCntrs;
     }
 
     /**
@@ -367,7 +374,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /**
      * @return Partition counters update deferred until transaction commit.
      */
-    public Map<Integer, PartitionUpdateCounters> updateCounters() {
+    public Collection<PartitionUpdateCountersMessage> updateCounters() {
         return updCntrs;
     }
 
@@ -429,7 +436,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeMap("updCntrs", updCntrs, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("updCntrs", updCntrs, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -511,7 +518,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 28:
-                updCntrs = reader.readMap("updCntrs", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
+                updCntrs = reader.readCollection("updCntrs", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 613f160..a0c9d15 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -944,43 +944,32 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param node Backup node.
      * @return Partition counters map for the given backup node.
      */
-    public Map<Integer, PartitionUpdateCounters> filterUpdateCountersForBackupNode(ClusterNode node) {
+    public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(ClusterNode node) {
         TxCounters txCntrs = txCounters(false);
 
-        if (txCntrs == null)
+        if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
             return null;
 
-        Map<Integer, PartitionUpdateCounters> updCntrs = txCntrs.updateCounters();
+        Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
 
-        Map<Integer, PartitionUpdateCounters> res = new HashMap<>();
+        List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
 
         AffinityTopologyVersion top = topologyVersionSnapshot();
 
-        for (Map.Entry<Integer, PartitionUpdateCounters> entry : updCntrs.entrySet()) {
-            Integer cacheId = entry.getKey();
+        for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
+            GridCacheAffinityManager affinity = cctx.cacheContext(partCntrs.cacheId()).affinity();
 
-            Map<Integer, Long> partsCntrs = entry.getValue().updateCounters();
+            PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
 
-            assert !F.isEmpty(partsCntrs);
+            for (int i = 0; i < partCntrs.size(); i++) {
+                int part = partCntrs.partition(i);
 
-            GridCacheAffinityManager affinity = cctx.cacheContext(cacheId).affinity();
-
-            Map<Integer, Long> resCntrs = new HashMap<>(partsCntrs.size());
-
-            for (Map.Entry<Integer, Long> e : partsCntrs.entrySet()) {
-                Integer p = e.getKey();
-
-                Long cntr = e.getValue();
-
-                if (affinity.backupByPartition(node, p, top)) {
-                    assert cntr != null && cntr > 0 : cntr;
-
-                    resCntrs.put(p, cntr);
-                }
+                if (affinity.backupByPartition(node, part, top))
+                    resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
             }
 
-            if (!resCntrs.isEmpty())
-                res.put(cacheId, new PartitionUpdateCounters(resCntrs));
+            if (resCntrs.size() > 0)
+                res.add(resCntrs);
         }
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 1974038..0edf63f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
@@ -69,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.dr.GridDrType;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
@@ -1397,7 +1397,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 tx.activeCachesDeploymentEnabled(),
                 tx.storeWriteThrough(),
                 retVal,
-                mvccSnapshot);
+                mvccSnapshot,
+                tx.filterUpdateCountersForBackupNode(n));
 
             req.queryUpdate(dhtMapping.queryUpdate());
 
@@ -1503,7 +1504,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                     tx.activeCachesDeploymentEnabled(),
                     tx.storeWriteThrough(),
                     retVal,
-                    mvccSnapshot);
+                    mvccSnapshot,
+                    null);
 
                 for (IgniteTxEntry entry : nearMapping.entries()) {
                     if (CU.writes().apply(entry)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 68c1f39..30e8ceb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -88,6 +88,10 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> ownedVals;
 
+    /** */
+    @GridDirectCollection(PartitionUpdateCountersMessage.class)
+    private Collection<PartitionUpdateCountersMessage> counters;
+
     /** Near transaction ID. */
     private GridCacheVersion nearXidVer;
 
@@ -130,7 +134,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
      * @param last {@code True} if this is last prepare request for node.
      * @param addDepInfo Deployment info flag.
      * @param storeWriteThrough Cache store write through flag.
-     * @param retVal Need return value flag.
+     * @param retVal Need return value flag
+     * @param mvccSnapshot Mvcc snapshot.
+     * @param counters Update counters for mvcc Tx.
      */
     public GridDhtTxPrepareRequest(
         IgniteUuid futId,
@@ -149,7 +155,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         boolean addDepInfo,
         boolean storeWriteThrough,
         boolean retVal,
-        MvccSnapshot mvccInfo) {
+        MvccSnapshot mvccSnapshot,
+        Collection<PartitionUpdateCountersMessage> counters) {
         super(tx,
             timeout,
             null,
@@ -170,7 +177,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
         this.nearXidVer = nearXidVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
-        this.mvccSnapshot = mvccInfo;
+        this.mvccSnapshot = mvccSnapshot;
+        this.counters = counters;
 
         storeWriteThrough(storeWriteThrough);
         needReturnValue(retVal);
@@ -190,6 +198,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     }
 
     /**
+     * @return Update counters list.
+     */
+    public Collection<PartitionUpdateCountersMessage> updateCounters() {
+        return counters;
+    }
+
+    /**
      * @return Near cache writes for which cache was not found (possible if client near cache was closed).
      */
     @Nullable public List<IgniteTxKey> nearWritesCacheMissed() {
@@ -492,6 +507,12 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 writer.incrementState();
 
+            case 34:
+                if (!writer.writeCollection("counters", counters, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -620,6 +641,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
                 reader.incrementState();
 
+            case 34:
+                counters = reader.readCollection("counters", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtTxPrepareRequest.class);
@@ -632,7 +661,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 34;
+        return 35;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a82d4aa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
index 650e1dc..a1bc26b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxQueryEnlistRequest.java
@@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.EnlistOperation;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -68,9 +67,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
     @GridDirectCollection(Message.class)
     private List<Message> vals;
 
-    /** */
-    private GridLongList updCntrs;
-
     /**
      *
      */
@@ -86,7 +82,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
      * @param mvccOpCnt Mvcc operation counter.
      * @param keys Keys.
      * @param vals Values.
-     * @param updCntrs Update counters.
      */
     GridDhtTxQueryEnlistRequest(int cacheId,
         IgniteUuid dhtFutId,
@@ -95,8 +90,7 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
         int batchId,
         int mvccOpCnt,
         List<KeyCacheObject> keys,
-        List<Message> vals,
-        GridLongList updCntrs) {
+        List<Message> vals) {
         this.cacheId = cacheId;
         this.dhtFutId = dhtFutId;
         this.lockVer = lockVer;
@@ -105,7 +99,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
         this.mvccOpCnt = mvccOpCnt;
         this.keys = keys;
         this.vals = vals;
-        this.updCntrs = updCntrs;
     }
 
     /**
@@ -160,13 +153,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
     }
 
     /**
-     * @return Update counters.
-     */
-    public GridLongList updateCounters() {
-        return updCntrs;
-    }
-
-    /**
      * @return Batch id.
      */
     public int batchId() {
@@ -292,12 +278,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("updCntrs", updCntrs))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -372,14 +352,6 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
                 reader.incrementState();
 
             case 9:
-                updCntrs = reader.readMessage("updCntrs");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -394,7 +366,7 @@ public class GridDhtTxQueryEnlistRequest extends GridCacheIdMessage {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 10;
     }
 
     /** {@inheritDoc} */


Mime
View raw message