ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [4/9] ignite git commit: IGNITE-426 WIP
Date Thu, 22 Oct 2015 10:34:30 GMT
IGNITE-426 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1a05948f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1a05948f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1a05948f

Branch: refs/heads/ignite-426-2-reb
Commit: 1a05948f43ce3e1fc55b9b28123f323131297777
Parents: 1f6ba6c
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Fri Sep 11 18:57:59 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Oct 21 16:54:23 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |  21 +-
 .../internal/GridMessageListenHandler.java      |  17 +
 .../communication/GridIoMessageFactory.java     |  18 +
 .../processors/cache/GridCacheEntryEx.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     | 142 ++++++--
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../cache/GridCacheUpdateAtomicResult.java      |  15 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  83 +++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  22 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 124 +++++--
 .../GridDhtPartitionsExchangeFuture.java        |  35 +-
 .../preloader/GridDhtPartitionsFullMessage.java |  57 ++-
 .../GridDhtPartitionsSingleMessage.java         |  49 ++-
 .../distributed/near/GridNearAtomicCache.java   |  14 +-
 .../continuous/CacheContinuousQueryEntry.java   |   7 +
 .../CacheContinuousQueryFilteredEntry.java      | 228 ++++++++++++
 .../continuous/CacheContinuousQueryHandler.java | 294 ++++++++++++---
 .../CacheContinuousQueryListener.java           |   8 +
 .../CacheContinuousQueryLostPartition.java      | 156 ++++++++
 .../continuous/CacheContinuousQueryManager.java |  22 +-
 .../continuous/GridContinuousBatch.java         |  39 +-
 .../continuous/GridContinuousBatchAdapter.java  |  43 ++-
 .../continuous/GridContinuousHandler.java       |  22 ++
 .../continuous/GridContinuousProcessor.java     |   5 +
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +-
 ...acheContinuousQueryFailoverAbstractTest.java | 357 ++++++++++++++++++-
 .../testframework/junits/GridAbstractTest.java  |   2 +-
 27 files changed, 1612 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 599d301..1b9c46c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.util.typedef.F;
@@ -210,8 +212,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
                                                     }
                                                 }
 
-                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
-                                                    false);
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
+                                                    false, false);
                                             }
                                             catch (ClusterTopologyCheckedException ignored) {
                                                 // No-op.
@@ -374,6 +376,21 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void partitionLost(String cacheName, int partId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index ff38949..e038794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -26,6 +26,8 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -167,6 +169,21 @@ public class GridMessageListenHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public GridContinuousBatch createBatch() {
+        return new GridContinuousBatchAdapter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void partitionLost(String cacheName, int partId) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Object orderedTopic() {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 079015c..6eb9e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -90,7 +90,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder;
@@ -684,6 +687,21 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 114:
+                msg = new CacheContinuousQueryBatchAck();
+
+                break;
+
+            case 115:
+                msg = new CacheContinuousQueryFilteredEntry();
+
+                break;
+
+            case 116:
+                msg = new CacheContinuousQueryLostPartition();
+
+                break;
+
             // [-3..112] - this
             // [120..123] - DR
             // [-4..-22] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 430590a..a64752b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -450,7 +450,9 @@ public interface GridCacheEntryEx {
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4bf0aa1..259869e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
@@ -1032,6 +1033,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Object key0 = null;
         Object val0 = null;
 
+        long updateIdx0;
+
         synchronized (this) {
             checkObsolete();
 
@@ -1101,6 +1104,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     deletedUnlocked(false);
             }
 
+            updateIdx0 = nextPartIndex(topVer);
+
             update(val, expireTime, ttl, newVer);
 
             drReplicate(drType, val, newVer);
@@ -1126,8 +1131,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     subjId, null, taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
         }
@@ -1191,11 +1196,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         Cache.Entry entry0 = null;
 
+        Long updateIdx0;
+
         synchronized (this) {
             checkObsolete();
 
             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
-                    "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
+                "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
 
             boolean startVer = isStartVersion();
 
@@ -1252,6 +1259,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 }
             }
 
+            updateIdx0 = nextPartIndex(topVer);
+
+//            if (updateIdx != null)
+//                updateIdx0 = updateIdx;
+
             drReplicate(drType, null, newVer);
 
             if (metrics && cctx.cache().configuration().isStatisticsEnabled())
@@ -1284,8 +1296,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     taskName);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
-                cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);
         }
@@ -1627,7 +1639,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
+            if (!isNear()) {
+                long updateIdx = nextPartIndex(AffinityTopologyVersion.NONE);
+
+                cctx.continuousQueries().onEntryUpdated(this, key, val, old, true, false, updateIdx,
+                    AffinityTopologyVersion.NONE);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -1670,7 +1687,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         boolean conflictResolve,
         boolean intercept,
         @Nullable UUID subjId,
-        String taskName
+        String taskName,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 
@@ -1679,7 +1698,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         CacheObject oldVal;
         CacheObject updated;
 
-        GridCacheVersion enqueueVer = null;
+        GridCacheVersion rmvVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
 
@@ -1696,6 +1715,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Object key0 = null;
         Object updated0 = null;
 
+        Long updateIdx0 = null;
+
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
 
@@ -1803,7 +1824,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateIdx0);
                     }
                     // Will update something.
                     else {
@@ -1852,6 +1874,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                     "[entry=" + this + ", newVer=" + newVer + ']');
                         }
 
+                        if (!cctx.isNear()) {
+                            CacheObject evtVal;
+
+                            if (op == GridCacheOperation.TRANSFORM) {
+                                EntryProcessor<Object, Object, ?> entryProcessor =
+                                    (EntryProcessor<Object, Object, ?>)writeObj;
+
+                                CacheInvokeEntry<Object, Object> entry =
+                                    new CacheInvokeEntry<>(cctx, key, prevVal, version());
+
+                                try {
+                                    entryProcessor.process(entry, invokeArgs);
+
+                                    evtVal = entry.modified() ?
+                                        cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                                }
+                                catch (Exception e) {
+                                    evtVal = prevVal;
+                                }
+                            }
+                            else
+                                evtVal = (CacheObject)writeObj;
+
+                            updateIdx0 = nextPartIndex(topVer);
+
+                            if (updateIdx != null)
+                                updateIdx0 = updateIdx;
+
+                            cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false,
+                                updateIdx0, topVer);
+                        }
+
                         return new GridCacheUpdateAtomicResult(false,
                             retval ? rawGetOrUnmarshalUnlocked(false) : null,
                             null,
@@ -1860,7 +1914,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateIdx0);
                     }
                 }
                 else
@@ -1936,7 +1991,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        false);
+                        false,
+                        updateIdx0);
                 }
             }
 
@@ -1983,7 +2039,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         CU.EXPIRE_TIME_ETERNAL,
                         null,
                         null,
-                        false);
+                        false,
+                        updateIdx0);
                 }
             }
             else
@@ -2083,7 +2140,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateIdx0);
                     else if (interceptorVal != updated0) {
                         updated0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -2120,6 +2178,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 update(updated, newExpireTime, newTtl, newVer);
 
+                updateIdx0 = nextPartIndex(topVer);
+
+                if (updateIdx != null)
+                    updateIdx0 = updateIdx;
+
                 drReplicate(drType, updated, newVer);
 
                 recordNodeId(affNodeId, topVer);
@@ -2159,7 +2222,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             CU.EXPIRE_TIME_ETERNAL,
                             null,
                             null,
-                            false);
+                            false,
+                            updateIdx0);
                 }
 
                 if (writeThrough)
@@ -2191,7 +2255,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
 
-                enqueueVer = newVer;
+                rmvVer = newVer;
 
                 boolean hasValPtr = hasOffHeapPointer();
 
@@ -2211,6 +2275,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 recordNodeId(affNodeId, topVer);
 
+                updateIdx0 = nextPartIndex(topVer);
+
+                if (updateIdx != null)
+                    updateIdx0 = updateIdx;
+
                 drReplicate(drType, null, newVer);
 
                 if (evt) {
@@ -2240,8 +2309,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (cctx.isReplicated() || primary)
-                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
+            if (!isNear())
+                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -2265,9 +2334,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             invokeRes,
             newSysTtl,
             newSysExpireTime,
-            enqueueVer,
+            rmvVer,
             conflictCtx,
-            true);
+            true,
+            updateIdx0);
     }
 
     /**
@@ -3056,9 +3126,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 drReplicate(drType, val, ver);
 
+                long updateIdx = -1;
+
+                if (!preload)
+                    updateIdx = nextPartIndex(topVer);
+
                 if (!skipQryNtf) {
-                    if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
-                        cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
+                    cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false);
                 }
@@ -3075,6 +3149,28 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         }
     }
 
+    /**
+     * @param topVer Topology version.
+     * @return Update index.
+     */
+    private long nextPartIndex(AffinityTopologyVersion topVer) {
+        long updateIdx;
+
+        //U.dumpStack();
+
+        if (!cctx.isLocal() && !isNear()) {
+            GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false);
+
+            assert locPart != null;
+
+            updateIdx = locPart.nextContinuousQueryUpdateIndex();
+        }
+        else
+            updateIdx = 0;
+
+        return updateIdx;
+    }
+
     /** {@inheritDoc} */
     @Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws
         IgniteCheckedException,
@@ -3942,7 +4038,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      */
     protected void deletedUnlocked(boolean deleted) {
         assert Thread.holdsLock(this);
-        assert cctx.deferredDelete();
+
+        if (!cctx.deferredDelete())
+            return;
 
         if (deleted) {
             assert !deletedUnlocked() : this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index adc2174..0065403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -892,7 +892,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue()) != null;
+                        updated |= top.update(null, entry.getValue(), null) != null;
                 }
 
                 if (!cctx.kernalContext().clientNode() && updated)
@@ -935,7 +935,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         top = cacheCtx.topology();
 
                     if (top != null)
-                        updated |= top.update(null, entry.getValue()) != null;
+                        updated |= top.update(null, entry.getValue(), null) != null;
                 }
 
                 if (updated)

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 3674284..092d990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult {
     /** Whether update should be propagated to DHT node. */
     private final boolean sndToDht;
 
+    /** */
+    private final Long updateIdx;
+
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
@@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult {
      * @param rmvVer Version for deferred delete.
      * @param conflictRes DR resolution result.
      * @param sndToDht Whether update should be propagated to DHT node.
+     * @param updateIdx Partition update counter.
      */
     public GridCacheUpdateAtomicResult(boolean success,
         @Nullable CacheObject oldVal,
@@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult {
         long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
-        boolean sndToDht) {
+        boolean sndToDht,
+        long updateIdx) {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
@@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult {
         this.rmvVer = rmvVer;
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
+        this.updateIdx = updateIdx;
     }
 
     /**
@@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult {
     }
 
     /**
+     * @return Partition update index.
+     */
+    public Long updateIdx() {
+        return updateIdx;
+    }
+
+    /**
      * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time
      * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 854a83d..a487593 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
@@ -1111,7 +1112,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
 
             try {
-                topology().readLock();
+                GridDhtPartitionTopology top = topology();
+
+                top.readLock();
 
                 try {
                     if (topology().stopping()) {
@@ -1128,7 +1131,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     // Also do not check topology version if topology was locked on near node by
                     // external transaction or explicit lock.
                     if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
-                        !needRemap(req.topologyVersion(), topology().topologyVersion())) {
+                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
                         ClusterNode node = ctx.discovery().node(nodeId);
 
                         if (node == null) {
@@ -1143,7 +1146,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         if (ver == null) {
                             // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(topology().topologyVersion());
+                            ver = ctx.versions().next(top.topologyVersion());
 
                             if (hasNear)
                                 res.nearVersion(ver);
@@ -1155,6 +1158,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             log.debug("Using cache version for update request on primary node [ver=" + ver +
                                 ", req=" + req + ']');
 
+                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
+
                         dhtFut = createDhtFuture(ver, req, res, completionCb, false);
 
                         expiry = expiryPolicy(req.expiry());
@@ -1177,7 +1182,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             deleted = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
@@ -1196,7 +1202,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 completionCb,
                                 ctx.isDrEnabled(),
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             retVal = updRes.returnValue();
                             deleted = updRes.deleted();
@@ -1216,7 +1223,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         remap = true;
                 }
                 finally {
-                    topology().readUnlock();
+                    top.readUnlock();
                 }
             }
             catch (GridCacheEntryRemovedException e) {
@@ -1291,6 +1298,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether replication is enabled.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      * @throws GridCacheEntryRemovedException Should not be thrown.
      */
@@ -1306,7 +1314,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts.
         assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll.
@@ -1453,7 +1462,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1501,7 +1511,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 replicate,
                                 updRes,
                                 taskName,
-                                expiry);
+                                expiry,
+                                sndPrevVal);
 
                             firstEntryIdx = i;
 
@@ -1620,7 +1631,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 replicate,
                 updRes,
                 taskName,
-                expiry);
+                expiry,
+                sndPrevVal);
         }
         else
             assert filtered.isEmpty();
@@ -1696,6 +1708,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param replicate Whether DR is enabled for that cache.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Return value.
      * @throws GridCacheEntryRemovedException Should be never thrown.
      */
@@ -1710,7 +1723,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) throws GridCacheEntryRemovedException {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1767,7 +1781,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    req.returnValue(),
+                    sndPrevVal || req.returnValue(),
                     expiry,
                     true,
                     true,
@@ -1782,7 +1796,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     true,
                     intercept,
                     req.subjectId(),
-                    taskName);
+                    taskName,
+                    null,
+                    null);
 
                 if (dhtFut == null && !F.isEmpty(filteredReaders)) {
                     dhtFut = createDhtFuture(ver, req, res, completionCb, true);
@@ -1799,22 +1815,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         else if (conflictCtx.isMerge())
                             newConflictVer = null; // Conflict version is discarded in case of merge.
 
-                        EntryProcessor<Object, Object, Object> entryProcessor = null;
-
                         if (!readersOnly) {
                             dhtFut.addWriteEntry(entry,
                                 updRes.newValue(),
-                                entryProcessor,
+                                op == TRANSFORM ? req.entryProcessor(i) : null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime(),
-                                newConflictVer);
+                                newConflictVer,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateIdx());
                         }
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
-                                entryProcessor,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
                     }
@@ -1914,6 +1930,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param batchRes Batch update result.
      * @param taskName Task name.
      * @param expiry Expiry policy.
+     * @param sndPrevVal If {@code true} sends previous value to backups.
      * @return Deleted entries.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -1934,7 +1951,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean replicate,
         UpdateBatchResult batchRes,
         String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean sndPrevVal
     ) {
         assert putMap == null ^ rmvKeys == null;
 
@@ -2036,7 +2054,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/false,
+                        /*retval*/sndPrevVal,
                         expiry,
                         /*event*/true,
                         /*metrics*/true,
@@ -2051,7 +2069,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         /*conflict resolve*/false,
                         /*intercept*/false,
                         req.subjectId(),
-                        taskName);
+                        taskName,
+                        null,
+                        null);
 
                     assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null :
                         "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry;
@@ -2081,22 +2101,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        EntryProcessor<Object, Object, Object> entryProcessor =
-                            entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
-
                         if (!batchRes.readersOnly())
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
-                                entryProcessor,
+                                entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
-                                null);
+                                null,
+                                sndPrevVal,
+                                updRes.oldValue(),
+                                updRes.updateIdx());
 
                         if (!F.isEmpty(filteredReaders))
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 writeVal,
-                                entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
@@ -2499,7 +2518,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         entry = entryExx(key);
 
                         CacheObject val = req.value(i);
+                        CacheObject prevVal = req.previousValue(i);
                         EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i);
+                        Long updateIdx = req.updateIdx(i);
 
                         GridCacheOperation op = entryProcessor != null ? TRANSFORM :
                             (val != null) ? UPDATE : DELETE;
@@ -2521,7 +2542,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/!req.forceTransformBackups(),
+                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             replicate ? DR_BACKUP : DR_NONE,
@@ -2531,7 +2552,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             false,
                             intercept,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            prevVal,
+                            updateIdx);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -2573,7 +2596,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         catch (ClusterTopologyCheckedException ignored) {
             U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
-                req.nodeId());
+                nodeId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 35b8e27..4019579 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -132,6 +133,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest());
 
         waitForExchange = !topLocked;
+
+        // We can send entry processor instead of value to backup if updates are ordered.
+        forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM;
     }
 
     /** {@inheritDoc} */
@@ -198,16 +202,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param updateIdx Partition update index.
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+        int part = entry.partition();
+
+        Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
 
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -243,7 +253,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     entryProcessor,
                     ttl,
                     conflictExpireTime,
-                    conflictVer);
+                    conflictVer,
+                    addPrevVal,
+                    prevVal,
+                    updateIdx);
             }
         }
     }
@@ -252,14 +265,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
-     * @param entryProcessor Entry processor..
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
     public void addNearWriteEntries(Iterable<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -300,7 +311,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
             updateReq.addNearWriteValue(entry.key(),
                 val,
-                entryProcessor,
                 ttl,
                 expireTime);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index f5231ef..380b194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     @GridDirectCollection(CacheObject.class)
     private List<CacheObject> vals;
 
+    /** Previous values. */
+    @GridToStringInclude
+    @GridDirectCollection(CacheObject.class)
+    private List<CacheObject> prevVals;
+
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -139,6 +144,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Task name hash. */
     private int taskNameHash;
 
+    /** Partition. */
+    private GridLongList updateCntrs;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -209,13 +217,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param prevVal Previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer) {
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateIdx) {
         keys.add(key);
 
         if (forceTransformBackups) {
@@ -226,6 +239,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         else
             vals.add(val);
 
+        if (addPrevVal) {
+            if (prevVals == null)
+                prevVals = new ArrayList<>();
+
+            prevVals.add(prevVal);
+        }
+
+        if (updateIdx != null) {
+            if (updateCntrs == null)
+                updateCntrs = new GridLongList();
+
+            updateCntrs.add(updateIdx);
+        }
+
         // In case there is no conflict, do not create the list.
         if (conflictVer != null) {
             if (conflictVers == null) {
@@ -268,36 +295,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /**
      * @param key Key to add.
      * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
     public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime)
     {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
-
-            if (forceTransformBackups) {
-                nearEntryProcessors = new ArrayList<>();
-                nearEntryProcessorsBytes = new ArrayList<>();
-            }
-            else
-                nearVals = new ArrayList<>();
+            nearVals = new ArrayList<>();
         }
 
         nearKeys.add(key);
-
-        if (forceTransformBackups) {
-            assert entryProcessor != null;
-
-            nearEntryProcessors.add(entryProcessor);
-        }
-        else
-            nearVals.add(val);
+        nearVals.add(val);
 
         if (ttl >= 0) {
             if (nearTtls == null) {
@@ -408,6 +420,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
+     * @param idx Counter index.
+     * @return Update counter.
+     */
+    public Long updateIdx(int idx) {
+        if (idx < updateCntrs.size())
+            return updateCntrs.get(idx);
+
+        return null;
+    }
+
+    /**
      * @param idx Near key index.
      * @return Key.
      */
@@ -428,6 +451,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /**
      * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public CacheObject previousValue(int idx) {
+        if (prevVals != null)
+            return prevVals.get(idx);
+
+        return null;
+    }
+
+    /**
+     * @param idx Key index.
      * @return Entry processor.
      */
     @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
@@ -673,42 +707,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeMessage("updateCntrs", updateCntrs))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
+                if (!writer.writeMessage("ttls", ttls))
+                    return false;
+
+                writer.incrementState();
+
+            case 23:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -835,7 +881,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 16:
-                subjId = reader.readUuid("subjId");
+                updateCntrs = reader.readMessage("updateCntrs");
 
                 if (!reader.isLastRead())
                     return false;
@@ -843,6 +889,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 17:
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 18:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -854,7 +916,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 18:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -862,7 +924,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 19:
+            case 21:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -870,7 +932,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 20:
+            case 22:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -878,7 +940,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 21:
+            case 23:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -886,7 +948,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 22:
+            case 24:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -906,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 23;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 77e47a7..cb4bb4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                                 if (updateTop) {
                                     for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
                                         if (top.cacheId() == cacheCtx.cacheId()) {
-                                            cacheCtx.topology().update(exchId, top.partitionMap(true));
+                                            cacheCtx.topology().update(exchId,
+                                                top.partitionMap(true),
+                                                top.updateCounters());
 
                                             break;
                                         }
@@ -811,6 +813,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     }
                 }
 
+                boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT;
+
                 for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                     if (cacheCtx.isLocal())
                         continue;
@@ -821,6 +825,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     if (drCacheCtx.isDrEnabled())
                         drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft());
 
+                    if (topChanged)
+                        cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+
                     // Partition release future is done so we can flush the write-behind store.
                     cacheCtx.store().forceFlush();
 
@@ -954,14 +961,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
      * @param id ID.
      * @throws IgniteCheckedException If failed.
      */
-    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException {
+    private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
+        throws IgniteCheckedException {
         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
             clientOnlyExchange,
             cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-            if (!cacheCtx.isLocal())
+            if (!cacheCtx.isLocal()) {
                 m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap());
+
+                m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+            }
         }
 
         if (log.isDebugEnabled())
@@ -987,15 +998,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
                 boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0;
 
-                if (ready)
+                if (ready) {
                     m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true));
+
+                    m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters());
+                }
             }
         }
 
         // It is important that client topologies be added after contexts.
-        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
+        for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
             m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true));
 
+            m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters());
+        }
+
         if (log.isDebugEnabled())
             log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                 ", exchId=" + exchId + ", msg=" + m + ']');
@@ -1332,15 +1349,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) {
             Integer cacheId = entry.getKey();
 
+            Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId);
+
             GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
             if (cacheCtx != null)
-                cacheCtx.topology().update(exchId, entry.getValue());
+                cacheCtx.topology().update(exchId, entry.getValue(), cntrMap);
             else {
                 ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
 
                 if (oldest != null && oldest.isLocal())
-                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue());
+                    cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap);
             }
         }
     }
@@ -1358,7 +1377,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
                 cctx.exchange().clientTopology(cacheId, this);
 
-            top.update(exchId, entry.getValue());
+            top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index c06d773..758818d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
@@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** */
     private byte[] partsBytes;
 
+    /** Partitions update counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>();
+
+    /** Serialized partitions counters. */
+    private byte[] partCntrsBytes;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -92,13 +101,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             parts.put(cacheId, fullMap);
     }
 
-    /** {@inheritDoc}
-     * @param ctx*/
+    /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Partition update counters.
+     */
+    public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+        if (!partCntrs.containsKey(cacheId))
+            partCntrs.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+        Map<Integer, Long> res = partCntrs.get(cacheId);
+
+        return res != null ? res : Collections.<Integer, Long>emptyMap();
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
         if (parts != null && partsBytes == null)
             partsBytes = ctx.marshaller().marshal(parts);
+
+        if (partCntrs != null)
+            partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 
     /**
@@ -121,6 +151,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+        if (partCntrsBytes != null)
+            partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -139,12 +172,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (writer.state()) {
             case 5:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -167,7 +206,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         switch (reader.state()) {
             case 5:
-                partsBytes = reader.readByteArray("partsBytes");
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -175,6 +214,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 6:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -194,7 +241,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 83fbb1a..547c0f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
@@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions. */
     private byte[] partsBytes;
 
+    /** Partitions update counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>();
+
+    /** Serialized partitions counters. */
+    private byte[] partCntrsBytes;
+
     /** */
     private boolean client;
 
@@ -90,6 +99,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Partition update counters.
+     */
+    public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) {
+        partCntrs.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Partition update counters.
+     */
+    public Map<Integer, Long> partitionUpdateCounters(int cacheId) {
+        Map<Integer, Long> res = partCntrs.get(cacheId);
+
+        return res != null ? res : Collections.<Integer, Long>emptyMap();
+    }
+
+    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionMap> partitions() {
@@ -103,6 +130,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
+
+        if (partCntrs != null)
+            partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 
     /** {@inheritDoc} */
@@ -111,6 +141,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
+
+        if (partCntrsBytes != null)
+            partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
     }
 
     /** {@inheritDoc} */
@@ -135,6 +168,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 6:
+                if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -165,6 +204,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 6:
+                partCntrsBytes = reader.readByteArray("partCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -184,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 82054d9..eaeb5f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         /*write-through*/false,
                         /*read-through*/false,
                         /*retval*/false,
-                        /**expiry policy*/null,
+                        /*expiry policy*/null,
                         /*event*/true,
                         /*metrics*/true,
                         /*primary*/false,
@@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         false,
                         false,
                         subjId,
-                        taskName);
+                        taskName,
+                        null,
+                        null);
 
                     if (updRes.removeVersion() != null)
                         ctx.onDeferredDelete(entry, updRes.removeVersion());
@@ -351,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/!req.forceTransformBackups(),
+                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             DR_NONE,
@@ -359,9 +361,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             expireTime,
                             null,
                             false,
-                            intercept,
+                            /*intercept*/false,
                             req.subjectId(),
-                            taskName);
+                            taskName,
+                            null,
+                            null);
 
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 9ea9b73..470aa09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -159,6 +159,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
     }
 
     /**
+     * @return Filtered entry.
+     */
+    boolean filtered() {
+        return false;
+    }
+
+    /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException In case of error.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
new file mode 100644
index 0000000..14d8f51
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import javax.cache.event.EventType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query entry.
+ */
+public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private EventType evtType;
+
+    /** Cache name. */
+    private int cacheId;
+
+    /** Partition. */
+    private int part;
+
+    /** Update index. */
+    private long updateIdx;
+
+    /** */
+    @GridToStringInclude
+    @GridDirectTransient
+    private AffinityTopologyVersion topVer;
+
+    /**
+     * Required by {@link Message}.
+     */
+    public CacheContinuousQueryFilteredEntry() {
+        // No-op.
+    }
+
+    /**
+     * @param e Cache continuous query entry.
+     */
+    CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) {
+        this.cacheId = e.cacheId();
+        this.evtType = e.eventType();
+        this.part = e.partition();
+        this.updateIdx = e.updateIndex();
+        this.topVer = e.topologyVersion();
+    }
+
+    /**
+     * @return Topology version if applicable.
+     */
+    @Nullable AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    int cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * @return Event type.
+     */
+    EventType eventType() {
+        return evtType;
+    }
+
+    /**
+     * @return Partition.
+     */
+    int partition() {
+        return part;
+    }
+
+    /**
+     * @return Update index.
+     */
+    long updateIndex() {
+        return updateIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override boolean filtered() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 115;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("cacheId", cacheId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("updateIdx", updateIdx))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheId = reader.readInt("cacheId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                byte evtTypeOrd;
+
+                evtTypeOrd = reader.readByte("evtType");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd);
+
+                reader.incrementState();
+
+            case 2:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                updateIdx = reader.readLong("updateIdx");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheContinuousQueryFilteredEntry.class, this);
+    }
+}
\ No newline at end of file


Mime
View raw message