ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [5/5] ignite git commit: IGNITE-426 Implemented failover for Continuous query.
Date Thu, 19 Nov 2015 16:56:04 GMT
IGNITE-426 Implemented failover for Continuous query.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce636372
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce636372
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce636372

Branch: refs/heads/ignite-1.5
Commit: ce6363729f3553c6854ed2e8cfc3b5c244678fcd
Parents: 8728a5b
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Thu Nov 19 19:50:58 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Thu Nov 19 19:50:58 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   22 +-
 .../internal/GridMessageListenHandler.java      |   18 +
 .../communication/GridIoMessageFactory.java     |    8 +-
 .../processors/cache/GridCacheEntryEx.java      |   12 +-
 .../processors/cache/GridCacheMapEntry.java     |  147 +-
 .../GridCachePartitionExchangeManager.java      |    4 +-
 .../cache/GridCacheUpdateAtomicResult.java      |   15 +-
 .../cache/GridCacheUpdateTxResult.java          |   24 +-
 .../GridDistributedTxRemoteAdapter.java         |   22 +-
 .../dht/GridClientPartitionTopology.java        |   38 +-
 .../distributed/dht/GridDhtLocalPartition.java  |   35 +
 .../dht/GridDhtPartitionTopology.java           |   26 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  112 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   14 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  112 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   79 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   75 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  140 +-
 .../GridDhtPartitionsExchangeFuture.java        |   35 +-
 .../preloader/GridDhtPartitionsFullMessage.java |   64 +-
 .../GridDhtPartitionsSingleMessage.java         |   56 +-
 .../distributed/near/GridNearAtomicCache.java   |   10 +-
 .../distributed/near/GridNearTxRemote.java      |    7 +
 .../CacheContinuousQueryBatchAck.java           |  163 ++
 .../continuous/CacheContinuousQueryEntry.java   |  196 +-
 .../continuous/CacheContinuousQueryEvent.java   |    3 +-
 .../continuous/CacheContinuousQueryHandler.java |  811 ++++++-
 .../CacheContinuousQueryListener.java           |   35 +
 .../continuous/CacheContinuousQueryManager.java |  151 +-
 .../cache/transactions/IgniteTxEntry.java       |   20 +
 .../cache/transactions/IgniteTxHandler.java     |    3 +
 .../transactions/IgniteTxLocalAdapter.java      |   18 +-
 .../cache/transactions/IgniteTxRemoteEx.java    |    7 +-
 .../continuous/GridContinuousBatch.java         |   44 +
 .../continuous/GridContinuousBatchAdapter.java  |   46 +
 .../continuous/GridContinuousHandler.java       |   22 +
 .../continuous/GridContinuousProcessor.java     |  221 +-
 .../StartRoutineAckDiscoveryMessage.java        |   14 +-
 .../StartRoutineDiscoveryMessage.java           |   21 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   10 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java | 2235 ++++++++++++++++++
 ...ryFailoverAtomicNearEnabledSelfSelfTest.java |   46 +
 ...FailoverAtomicPrimaryWriteOrderSelfTest.java |   44 +
 ...usQueryFailoverAtomicReplicatedSelfTest.java |   40 +
 ...inuousQueryFailoverTxReplicatedSelfTest.java |   32 +
 .../CacheContinuousQueryFailoverTxSelfTest.java |   39 +
 ...ridCacheContinuousQueryAbstractSelfTest.java |  153 +-
 .../GridCacheContinuousQueryTxSelfTest.java     |   49 +
 ...CacheContinuousQueryClientReconnectTest.java |  187 ++
 .../IgniteCacheContinuousQueryClientTest.java   |  157 +-
 ...cheContinuousQueryClientTxReconnectTest.java |   32 +
 .../p2p/GridP2PSameClassLoaderSelfTest.java     |   16 +-
 .../testframework/junits/GridAbstractTest.java  |    2 +-
 .../junits/common/GridCommonAbstractTest.java   |    3 +
 .../IgniteCacheQuerySelfTestSuite.java          |   16 +-
 .../yardstick/cache/CacheEntryEventProbe.java   |  156 ++
 56 files changed, 5753 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index b4ce4ab..3918976 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.util.typedef.F;
@@ -127,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateCounters(Map<Integer, Long> cntrs) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
         throws IgniteCheckedException {
         assert nodeId != null;
@@ -213,8 +221,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
                                                     }
                                                 }
 
-                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
-                                                    false);
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+                                                    false, false);
                                             }
                                             catch (ClusterTopologyCheckedException ignored) {
                                                 // No-op.
@@ -377,6 +385,16 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index ff38949..aa837b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -100,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public void updateCounters(Map<Integer, Long> cntrs) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
         ctx.io().addUserMessageListener(topic, pred);
 
@@ -167,6 +175,16 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3548aac..c671582 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -708,7 +709,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..114] - this
+            case 118:
+                msg = new CacheContinuousQueryBatchAck();
+
+                break;
+
+            // [-3..118] - this
             // [120..123] - DR
             // [-4..-22] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index af62e39..8d50616 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -360,6 +360,7 @@ public interface GridCacheEntryEx {
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
      * @param dhtVer Dht version for near cache entry.
+     * @param updateCntr Update counter.
      * @return Tuple containing success flag and old value. If success is {@code false},
      *      then value is {@code null}.
      * @throws IgniteCheckedException If storing value failed.
@@ -382,7 +383,8 @@ public interface GridCacheEntryEx {
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -417,7 +419,8 @@ public interface GridCacheEntryEx {
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**
@@ -446,6 +449,7 @@ public interface GridCacheEntryEx {
      * @param intercept If {@code true} then calls cache interceptor.
      * @param subjId Subject ID initiated this update.
      * @param taskName Task name.
+     * @param updateCntr Update counter.
      * @return Tuple where first value is flag showing whether operation succeeded,
      *      second value is old entry value if return value is requested, third is updated entry value,
      *      fourth is the version to enqueue for deferred delete the fifth is DR conflict context
@@ -478,7 +482,9 @@ public interface GridCacheEntryEx {
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0786a50..8d363ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1060,7 +1061,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr
     ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         CacheObject old;
 
@@ -1077,6 +1079,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Object key0 = null;
         Object val0 = null;
 
+        long updateCntr0;
+
         synchronized (this) {
             checkObsolete();
 
@@ -1155,6 +1159,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     deletedUnlocked(false);
             }
 
+            updateCntr0 = nextPartCounter(topVer);
+
+            if (updateCntr != null && updateCntr != 0)
+                updateCntr0 = updateCntr;
+
             update(val, expireTime, ttl, newVer);
 
             drReplicate(drType, val, newVer);
@@ -1180,8 +1189,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     subjId, null, taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+            if (cctx.isLocal() || cctx.isReplicated() ||
+                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+                    partition(), tx.local(), false, updateCntr0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1197,7 +1208,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         if (intercept)
             cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0));
 
-        return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
+        return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) :
             new GridCacheUpdateTxResult(false, null);
     }
 
@@ -1223,7 +1234,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         @Nullable GridCacheVersion explicitVer,
         @Nullable UUID subjId,
         String taskName,
-        @Nullable GridCacheVersion dhtVer
+        @Nullable GridCacheVersion dhtVer,
+        @Nullable Long updateCntr
         ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         assert cctx.transactional();
 
@@ -1245,6 +1257,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Cache.Entry entry0 = null;
 
+        Long updateCntr0;
+
         boolean deferred;
 
         boolean marked = false;
@@ -1261,7 +1275,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
-                    "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
+                "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
 
             boolean startVer = isStartVersion();
 
@@ -1318,6 +1332,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
+            updateCntr0 = nextPartCounter(topVer);
+
+            if (updateCntr != null && updateCntr != 0)
+                updateCntr0 = updateCntr;
+
             drReplicate(drType, null, newVer);
 
             if (metrics && cctx.cache().configuration().isStatisticsEnabled())
@@ -1350,8 +1369,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
+            if (cctx.isLocal() || cctx.isReplicated() ||
+                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
+                cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
+                    || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);
 
@@ -1394,7 +1415,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             else
                 ret = old;
 
-            return new GridCacheUpdateTxResult(true, ret);
+            return new GridCacheUpdateTxResult(true, ret, updateCntr0);
         }
         else
             return new GridCacheUpdateTxResult(false, null);
@@ -1686,7 +1707,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+            if (!isNear()) {
+                long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
+
+                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
+                    partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -1729,7 +1755,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
@@ -1755,6 +1783,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Object key0 = null;
         Object updated0 = null;
 
+        Long updateCntr0 = null;
+
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
 
@@ -1862,7 +1892,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateCntr0 == null ? 0 : updateCntr0);
                     }
                     // Will update something.
                     else {
@@ -1911,6 +1942,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                     "[entry=" + this + ", newVer=" + newVer + ']');
                         }
 
+                        if (!cctx.isNear()) {
+                            CacheObject evtVal;
+
+                            if (op == GridCacheOperation.TRANSFORM) {
+                                EntryProcessor<Object, Object, ?> entryProcessor =
+                                    (EntryProcessor<Object, Object, ?>)writeObj;
+
+                                CacheInvokeEntry<Object, Object> entry =
+                                    new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+                                try {
+                                    entryProcessor.process(entry, invokeArgs);
+
+                                    evtVal = entry.modified() ?
+                                        cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                                }
+                                catch (Exception e) {
+                                    evtVal = prevVal;
+                                }
+                            }
+                            else
+                                evtVal = (CacheObject)writeObj;
+
+                            updateCntr0 = nextPartCounter(topVer);
+
+                            if (updateCntr != null)
+                                updateCntr0 = updateCntr;
+
+                            cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
+                                || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+                        }
+
                         return new GridCacheUpdateAtomicResult(false,
                             retval ? rawGetOrUnmarshalUnlocked(false) : null,
                             null,
@@ -1919,7 +1982,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateCntr0 == null ? 0 : updateCntr0);
                     }
                 }
                 else
@@ -1995,7 +2059,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        false);
+                        false,
+                        updateCntr0 == null ? 0 : updateCntr0);
                 }
             }
 
@@ -2042,7 +2107,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        false);
+                        false,
+                        updateCntr0 == null ? 0 : updateCntr);
                 }
             }
             else
@@ -2142,7 +2208,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateCntr0 == null ? 0 : updateCntr0);
                     else if (interceptorVal != updated0) {
                         updated0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -2179,6 +2246,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 update(updated, newExpireTime, newTtl, newVer);
 
+                updateCntr0 = nextPartCounter(topVer);
+
+                if (updateCntr != null)
+                    updateCntr0 = updateCntr;
+
                 drReplicate(drType, updated, newVer);
 
                 recordNodeId(affNodeId, topVer);
@@ -2218,7 +2290,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateCntr0 == null ? 0 : updateCntr0);
                 }
 
                 if (writeThrough)
@@ -2270,6 +2343,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 recordNodeId(affNodeId, topVer);
 
+                updateCntr0 = nextPartCounter(topVer);
+
+                if (updateCntr != null)
+                    updateCntr0 = updateCntr;
+
                 drReplicate(drType, null, newVer);
 
                 if (evt) {
@@ -2299,9 +2377,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
-
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
             if (intercept) {
@@ -2326,7 +2401,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             newSysExpireTime,
             enqueueVer,
             conflictCtx,
-            true);
+            true,
+            updateCntr0);
     }
 
     /**
@@ -3146,11 +3222,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 else if (deletedUnlocked())
                     deletedUnlocked(false);
 
+                long updateCntr = 0;
+
+                if (!preload)
+                    updateCntr = nextPartCounter(topVer);
+
                 drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
-                        cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
+                    cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
+                        || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }
@@ -3167,6 +3248,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         }
     }
 
+    /**
+     * @param topVer Topology version.
+     * @return Update counter.
+     */
+    private long nextPartCounter(AffinityTopologyVersion topVer) {
+        long updateCntr;
+
+        if (!cctx.isLocal() && !isNear()) {
+            GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
+
+            assert locPart != null;
+
+            updateCntr = locPart.nextUpdateCounter();
+        }
+        else
+            updateCntr = 0;
+
+        return updateCntr;
+    }
+
     /** {@inheritDoc} */
     @Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws
         IgniteCheckedException,

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e19b310..cd89416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -989,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue()) != null;
+                        updated |= top.update(null, entry.getValue(), null) != null;
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -1032,7 +1032,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue()) != null;
+                        updated |= top.update(null, entry.getValue(), null) != null;
                 }
 
                 if (updated)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 3674284..9df476e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult {
     /** Whether update should be propagated to DHT node. */
     private final boolean sndToDht;
 
+    /** */
+    private final Long updateCntr;
+
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
@@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult {
      * @param rmvVer Version for deferred delete.
      * @param conflictRes DR resolution result.
      * @param sndToDht Whether update should be propagated to DHT node.
+     * @param updateCntr Partition update counter.
      */
     public GridCacheUpdateAtomicResult(boolean success,
         @Nullable CacheObject oldVal,
@@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult {
         long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
-        boolean sndToDht) {
+        boolean sndToDht,
+        long updateCntr) {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
@@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult {
         this.rmvVer = rmvVer;
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
+        this.updateCntr = updateCntr;
     }
 
     /**
@@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult {
     }
 
     /**
+     * @return Partition update index.
+     */
+    public Long updateCounter() {
+        return updateCntr;
+    }
+
+    /**
      * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time
      * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
index ffda7a2..461baa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java
@@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult {
     @GridToStringInclude
     private final CacheObject oldVal;
 
+    /** Partition idx. */
+    private long updateCntr;
+
     /**
      * Constructor.
      *
@@ -44,6 +47,25 @@ public class GridCacheUpdateTxResult {
     }
 
     /**
+     * Constructor.
+     *
+     * @param success Success flag.
+     * @param oldVal Old value (if any),
+     */
+    GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long updateCntr) {
+        this.success = success;
+        this.oldVal = oldVal;
+        this.updateCntr = updateCntr;
+    }
+
+    /**
+     * @return Partition idx.
+     */
+    public long updatePartitionCounter() {
+        return updateCntr;
+    }
+
+    /**
      * @return Success flag.
      */
     public boolean success() {
@@ -61,4 +83,4 @@ public class GridCacheUpdateTxResult {
     @Override public String toString() {
         return S.toString(GridCacheUpdateTxResult.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 0d49584..f9ac2df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -265,6 +265,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void setPartitionUpdateCounters(long[] cntrs) {
+        if (writeMap() != null && !writeMap().isEmpty() && cntrs != null && cntrs.length > 0) {
+            int i = 0;
+
+            for (IgniteTxEntry txEntry : writeMap().values()) {
+                txEntry.updateCounter(cntrs[i]);
+
+                ++i;
+            }
+        }
+    }
+
     /**
      * Adds completed versions to an entry.
      *
@@ -529,7 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 near() ? null : explicitVer,
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
-                                                dhtVer);
+                                                dhtVer,
+                                                txEntry.updateCounter());
                                         else {
                                             cached.innerSet(this,
                                                 eventNodeId(),
@@ -547,7 +561,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                                 near() ? null : explicitVer,
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
-                                                dhtVer);
+                                                dhtVer,
+                                                txEntry.updateCounter());
 
                                             // Keep near entry up to date.
                                             if (nearCached != null) {
@@ -575,7 +590,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                                             near() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
                                             resolveTaskName(),
-                                            dhtVer);
+                                            dhtVer,
+                                            txEntry.updateCounter());
 
                                         // Keep near entry up to date.
                                         if (nearCached != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 162c116..b7169bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** Lock. */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    /** Partition update counters. */
+    private Map<Integer, Long> cntrMap = new HashMap<>();
+
     /**
      * @param cctx Context.
      * @param cacheId Cache ID.
@@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
+        GridDhtPartitionFullMap partMap,
+        Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
@@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
             part2node = p2n;
 
+            if (cntrMap != null)
+                this.cntrMap = new HashMap<>(cntrMap);
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
+        GridDhtPartitionMap parts,
+        Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 }
             }
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+            }
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -852,6 +869,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> updateCounters() {
+        lock.readLock().lock();
+
+        try {
+            return new HashMap<>(cntrMap);
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 1516ee4..63e2899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicStampedReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -114,6 +115,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
 
+    /** Update counter. */
+    private final AtomicLong cntr = new AtomicLong();
+
     /**
      * @param cctx Context.
      * @param id Partition ID.
@@ -532,6 +536,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             if (cctx.isDrEnabled())
                 cctx.dr().partitionEvicted(id);
 
+            cctx.continuousQueries().onPartitionEvicted(id);
+
             cctx.dataStructures().onPartitionEvicted(id);
 
             rent.onDone();
@@ -599,6 +605,35 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
+     * @return Next update index.
+     */
+    public long nextUpdateCounter() {
+        return cntr.incrementAndGet();
+    }
+
+    /**
+     * @return Current update index.
+     */
+    public long updateCounter() {
+        return cntr.get();
+    }
+
+    /**
+     * @param val Update index value.
+     */
+    public void updateCounter(long val) {
+        while (true) {
+            long val0 = cntr.get();
+
+            if (val0 >= val)
+                break;
+
+            if (cntr.compareAndSet(val0, val))
+                break;
+        }
+    }
+
+    /**
      * Clears values for this partition.
      */
     private void clearAll() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d642314..3ac2b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology {
      *
      * @param exchId Exchange ID.
      * @param exchFut Exchange future.
+     * @param updateSeq Update sequence.
+     * @param stopping Stopping flag.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
     public void updateTopologyVersion(
@@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology {
     /**
      * @param exchId Exchange ID.
      * @param partMap Update partition map.
+     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
-    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap);
+    public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionFullMap partMap,
+        @Nullable Map<Integer, Long> cntrMap);
 
     /**
      * @param exchId Exchange ID.
      * @param parts Partitions.
+     * @param cntrMap Partition update counters.
      * @return Local partition map if there were evictions or {@code null} otherwise.
      */
     @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts);
+        GridDhtPartitionMap parts,
+        @Nullable Map<Integer, Long> cntrMap);
+
+    /**
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> updateCounters();
 
     /**
      * @param part Partition to own.
@@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology {
 
     /**
      * @param part Evicted partition.
+     * @param updateSeq Update sequence increment flag.
      */
     public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
 
@@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology {
      * @param threshold Threshold for number of entries.
      */
     public void printMemoryStats(int threshold);
-}
\ No newline at end of file
+
+    /**
+     * @param topVer Topology version.
+     * @return {@code True} if rebalance process finished.
+     */
+    public boolean rebalanceFinished(AffinityTopologyVersion topVer);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6bd283a..39c55db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** Lock. */
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
+    /** Partition update counter. */
+    private Map<Integer, Long> cntrMap = new HashMap<>();
+
+    /** */
+    private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
     /**
      * @param cctx Context.
      */
@@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             topReadyFut = null;
 
             topVer = AffinityTopologyVersion.NONE;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             updateSeq.setIfGreater(updSeq);
 
             topReadyFut = exchFut;
+
+            rebalancedTopVer = AffinityTopologyVersion.NONE;
         }
         finally {
             lock.writeLock().unlock();
@@ -292,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             long updateSeq = this.updateSeq.incrementAndGet();
 
+            cntrMap.clear();
+
             // If this is the oldest node.
             if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
                 if (node2part == null) {
@@ -525,6 +537,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 }
             }
 
+            updateRebalanceVersion();
+
             consistencyCheck();
         }
         finally {
@@ -732,7 +746,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @param states Additional partition states.
      * @return List of nodes for the partition.
      */
-    private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+    private List<ClusterNode> nodes(int p,
+        AffinityTopologyVersion topVer,
+        GridDhtPartitionState state,
+        GridDhtPartitionState... states) {
         Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
 
         lock.readLock().lock();
@@ -831,7 +848,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionFullMap partMap) {
+        GridDhtPartitionFullMap partMap,
+        @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
@@ -843,6 +861,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+
+                for (GridDhtLocalPartition part : locParts.values()) {
+                    Long cntr = cntrMap.get(part.id());
+
+                    if (cntr != null)
+                        part.updateCounter(cntr);
+                }
+            }
+
             if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
@@ -913,6 +947,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             boolean changed = checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -928,7 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** {@inheritDoc} */
     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
     @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
-        GridDhtPartitionMap parts) {
+        GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap) {
         if (log.isDebugEnabled())
             log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
 
@@ -946,6 +982,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
             if (stopping)
                 return null;
 
+            if (cntrMap != null) {
+                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+                    Long cntr = this.cntrMap.get(e.getKey());
+
+                    if (cntr == null || cntr < e.getValue())
+                        this.cntrMap.put(e.getKey(), e.getValue());
+                }
+
+                for (GridDhtLocalPartition part : locParts.values()) {
+                    Long cntr = cntrMap.get(part.id());
+
+                    if (cntr != null)
+                        part.updateCounter(cntr);
+                }
+            }
+
             if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
                 if (log.isDebugEnabled())
                     log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
@@ -1008,6 +1060,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
             changed |= checkEvictions(updateSeq);
 
+            updateRebalanceVersion();
+
             consistencyCheck();
 
             if (log.isDebugEnabled())
@@ -1254,6 +1308,33 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public Map<Integer, Long> updateCounters() {
+        lock.readLock().lock();
+
+        try {
+            Map<Integer, Long> res = new HashMap<>(cntrMap);
+
+            for (GridDhtLocalPartition part : locParts.values()) {
+                Long cntr0 = res.get(part.id());
+                Long cntr1 = part.updateCounter();
+
+                if (cntr0 == null || cntr1 > cntr0)
+                    res.put(part.id(), cntr1);
+            }
+
+            return res;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+        return topVer.equals(rebalancedTopVer);
+    }
+
+    /** {@inheritDoc} */
     @Override public void printMemoryStats(int threshold) {
         X.println(">>>  Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
 
@@ -1266,6 +1347,31 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /**
+     *
+     */
+    private void updateRebalanceVersion() {
+        if (!rebalancedTopVer.equals(topVer)) {
+            for (int i = 0; i < cctx.affinity().partitions(); i++) {
+                List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+
+                // Topology doesn't contain server nodes (just clients).
+                if (affNodes.isEmpty() || (node2part != null && !node2part.valid()))
+                    continue;
+
+                List<ClusterNode> owners = owners(i);
+
+                if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+                    return;
+            }
+
+            rebalancedTopVer = topVer;
+
+            if (log.isDebugEnabled())
+                log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
+        }
+    }
+
+    /**
      * @param p Partition.
      * @param nodeId Node ID.
      * @param match State to match.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index bb370a5..e8ef5d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
@@ -42,9 +42,7 @@ import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 
@@ -356,6 +354,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
 
             add(fut); // Append new future.
 
+            Collection<Long> updCntrs = new ArrayList<>(dhtMapping.entries().size());
+
+            for (IgniteTxEntry e : dhtMapping.entries())
+                updCntrs.add(e.updateCounter());
+
             GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
                 tx.nearNodeId(),
                 futId,
@@ -379,7 +382,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
                 tx.size(),
                 tx.subjectId(),
                 tx.taskNameHash(),
-                tx.activeCachesDeploymentEnabled());
+                tx.activeCachesDeploymentEnabled(),
+                updCntrs);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index caa0aa5..65f1cb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Check comitted flag. */
     private boolean checkCommitted;
 
+    /** Partition update counter. */
+    @GridToStringInclude
+    @GridDirectCollection(Long.class)
+    private GridLongList partUpdateCnt;
+
     /** One phase commit write version. */
     private GridCacheVersion writeVer;
 
@@ -163,6 +169,76 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
+     * @param nearNodeId Near node ID.
+     * @param futId Future ID.
+     * @param miniId Mini future ID.
+     * @param topVer Topology version.
+     * @param xidVer Transaction ID.
+     * @param threadId Thread ID.
+     * @param commitVer Commit version.
+     * @param isolation Transaction isolation.
+     * @param commit Commit flag.
+     * @param invalidate Invalidate flag.
+     * @param sys System flag.
+     * @param sysInvalidate System invalidation flag.
+     * @param syncCommit Synchronous commit flag.
+     * @param syncRollback Synchronous rollback flag.
+     * @param baseVer Base version.
+     * @param committedVers Committed versions.
+     * @param rolledbackVers Rolled back versions.
+     * @param pendingVers Pending versions.
+     * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param updateIdxs Partition update idxs.
+     * @param addDepInfo Deployment info flag.
+     */
+    public GridDhtTxFinishRequest(
+        UUID nearNodeId,
+        IgniteUuid futId,
+        IgniteUuid miniId,
+        @NotNull AffinityTopologyVersion topVer,
+        GridCacheVersion xidVer,
+        GridCacheVersion commitVer,
+        long threadId,
+        TransactionIsolation isolation,
+        boolean commit,
+        boolean invalidate,
+        boolean sys,
+        byte plc,
+        boolean sysInvalidate,
+        boolean syncCommit,
+        boolean syncRollback,
+        GridCacheVersion baseVer,
+        Collection<GridCacheVersion> committedVers,
+        Collection<GridCacheVersion> rolledbackVers,
+        Collection<GridCacheVersion> pendingVers,
+        int txSize,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        boolean addDepInfo,
+        Collection<Long> updateIdxs
+    ) {
+        this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
+            sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
+            subjId, taskNameHash, addDepInfo);
+
+        if (updateIdxs != null && !updateIdxs.isEmpty()) {
+            partUpdateCnt = new GridLongList(updateIdxs.size());
+
+            for (Long idx : updateIdxs)
+                partUpdateCnt.add(idx);
+        }
+    }
+
+    /**
+     * @return Partition update counters.
+     */
+    public GridLongList partUpdateCounters(){
+        return partUpdateCnt;
+    }
+
+    /**
      * @return Mini ID.
      */
     public IgniteUuid miniId() {
@@ -294,36 +370,42 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 27:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 28:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -382,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 22:
-                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
+                partUpdateCnt = reader.readMessage("partUpdateCnt");
 
                 if (!reader.isLastRead())
                     return false;
@@ -390,7 +472,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 23:
-                subjId = reader.readUuid("subjId");
+                pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -398,7 +480,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 24:
-                sysInvalidate = reader.readBoolean("sysInvalidate");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -406,7 +488,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 25:
-                taskNameHash = reader.readInt("taskNameHash");
+                sysInvalidate = reader.readBoolean("sysInvalidate");
 
                 if (!reader.isLastRead())
                     return false;
@@ -414,7 +496,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 26:
-                topVer = reader.readMessage("topVer");
+                taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -422,6 +504,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 27:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 28:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -441,6 +531,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 28;
+        return 29;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 75f8c2f..3ee1048 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -1204,7 +1205,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
             try {
-                topology().readLock();
+                GridDhtPartitionTopology top = topology();
+
+                top.readLock();
 
                 try {
                     if (topology().stopping()) {
@@ -1221,7 +1224,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
                     if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
-                        !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1236,7 +1239,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (ver == null) {
                             // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(topology().topologyVersion());
+                            ver = ctx.versions().next(top.topologyVersion());
 
                             if (hasNear)
                                 res.nearVersion(ver);
@@ -1248,6 +1251,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             log.debug("Using cache version for update request on primary node [ver=" + ver +
                                 ", req=" + req + ']');
 
+                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
                         dhtFut = createDhtFuture(ver, req, res, completionCb, false);
 
                         expiry = expiryPolicy(req.expiry());
@@ -1270,7 +1275,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
@@ -1289,7 +1295,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1309,7 +1316,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         remap = true;
                 }
                 finally {
-                    topology().readUnlock();
+                    top.readUnlock();
                 }
             }
             catch (GridCacheEntryRemovedException e) {
@@ -1384,6 +1391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether replication is enabled.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1399,7 +1407,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1546,7 +1555,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1594,7 +1604,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1713,7 +1724,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 replicate,
                 updRes,
                 taskName,
-                expiry);
+                expiry,
+                sndPrevVal);
         }
         else
             assert filtered.isEmpty();
@@ -1790,6 +1802,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -1804,7 +1817,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1861,7 +1875,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     expiry,
                     true,
                     true,
@@ -1876,7 +1890,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     true,
                     intercept,
                     req.subjectId(),
-                    taskName);
+                    taskName,
+                    null,
+                    null);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1901,7 +1917,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
-                                newConflictVer);
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
                         }
 
                         if (!F.isEmpty(filteredReaders))
@@ -1918,6 +1937,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
+                else if (!entry.isNear() && updRes.success()) {
+                    ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
+                        entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
+                        updRes.updateCounter(), topVer);
+                }
 
                 if (hasNear) {
                     if (primary && updRes.sendToDht()) {
@@ -2008,6 +2032,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param batchRes Batch update result.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -2028,7 +2053,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean replicate,
         UpdateBatchResult batchRes,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -2130,7 +2156,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/false,
+                        /*retval*/sndPrevVal,
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
@@ -2145,7 +2171,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         /*conflict resolve*/false,
                         /*intercept*/false,
                         req.subjectId(),
-                        taskName);
+                        taskName,
+                        null,
+                        null);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2184,7 +2212,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
-                                null);
+                                null,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateCounter());
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
@@ -2573,7 +2604,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         entry = entryExx(key);
 
                         CacheObject val = req.value(i);
+                        CacheObject prevVal = req.previousValue(i);
+
                         EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+                        Long updateIdx = req.updateCounter(i);
 
                         GridCacheOperation op = entryProcessor != null ? TRANSFORM :
                             (val != null) ? UPDATE : DELETE;
@@ -2605,11 +2639,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             false,
                             intercept,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            prevVal,
+                            updateIdx);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
+                        if (updRes.success() && !entry.isNear())
+                            ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
+                                updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
+                                false, false, updRes.updateCounter(), req.topologyVersion());
+
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index c34dcfd..c73b3b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -215,13 +215,17 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param updateCntr Partition update counter.
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
@@ -261,7 +265,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     entryProcessor,
                     ttl,
                     conflictExpireTime,
-                    conflictVer);
+                    conflictVer,
+                    addPrevVal,
+                    entry.partition(),
+                    prevVal,
+                    updateCntr);
+            }
+            else if (dhtNodes.size() == 1) {
+                try {
+                    cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
+                        entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
+                        updateCntr, updateReq.topologyVersion());
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
+                        + val + ", err=" + e + "]");
+                }
             }
         }
     }
@@ -331,8 +350,56 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                for (KeyCacheObject key : keys)
-                    updateRes.addFailedKey(key, err);
+                if (!mappings.isEmpty()) {
+                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+
+                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                        for (int i = 0; i < req.size(); i++) {
+                            KeyCacheObject key = req.key(i);
+
+                            if (!hndKeys.contains(key)) {
+                                updateRes.addFailedKey(key, err);
+
+                                cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+                                    updateReq.topologyVersion());
+
+                                hndKeys.add(key);
+
+                                if (hndKeys.size() == keys.size())
+                                    break exit;
+                            }
+                        }
+                    }
+                }
+                else
+                    for (KeyCacheObject key : keys)
+                        updateRes.addFailedKey(key, err);
+            }
+            else {
+                Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+
+                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                    for (int i = 0; i < req.size(); i++) {
+                        KeyCacheObject key = req.key(i);
+
+                        if (!hndKeys.contains(key)) {
+                            try {
+                                cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
+                                    key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
+                                    req.updateCounter(i), updateReq.topologyVersion());
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
+                                    + req.value(i) + ", err=" + e + "]");
+                            }
+
+                            hndKeys.add(key);
+
+                            if (hndKeys.size() == keys.size())
+                                break exit;
+                        }
+                    }
+                }
             }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)


Mime
View raw message