ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [11/21] ignite git commit: ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.
Date Thu, 11 Feb 2016 05:11:52 GMT
ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c05fc02
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c05fc02
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c05fc02

Branch: refs/heads/ignite-2542
Commit: 4c05fc0254f446ef040f5d22a066a0d4916a589e
Parents: 0b47d5c
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Feb 10 14:07:40 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Feb 10 14:07:40 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheLazyEntry.java        |   3 +
 .../processors/cache/GridCacheContext.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     | 118 +++-
 .../binary/CacheObjectBinaryProcessorImpl.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  79 ++-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  85 ++-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  38 +-
 .../cache/query/GridCacheQueryManager.java      |  30 +-
 .../continuous/CacheContinuousQueryHandler.java |   3 +-
 .../CacheContinuousQueryListener.java           |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 120 +++-
 .../continuous/GridContinuousProcessor.java     |  16 +-
 .../IgniteCacheEntryListenerAbstractTest.java   | 454 ++++++++----
 ...cheEntryListenerAtomicOffheapTieredTest.java |  32 +
 ...cheEntryListenerAtomicOffheapValuesTest.java |  32 +
 ...teCacheEntryListenerTxOffheapTieredTest.java |  32 +
 ...teCacheEntryListenerTxOffheapValuesTest.java |  32 +
 .../cache/IgniteCacheEntryListenerTxTest.java   |   1 +
 ...ContinuousQueryFailoverAbstractSelfTest.java |  10 +
 ...tomicPrimaryWriteOrderOffheapTieredTest.java |  33 +
 ...tinuousQueryFailoverTxOffheapTieredTest.java |  32 +
 ...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++
 ...ridCacheContinuousQueryAbstractSelfTest.java |  19 +-
 ...eContinuousQueryAtomicOffheapTieredTest.java |  32 +
 ...eContinuousQueryAtomicOffheapValuesTest.java |  32 +
 ...CacheContinuousQueryTxOffheapTieredTest.java |  32 +
 ...CacheContinuousQueryTxOffheapValuesTest.java |  32 +
 .../junits/common/GridCommonAbstractTest.java   |   2 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   8 +
 .../IgniteCacheQuerySelfTestSuite.java          |  14 +
 30 files changed, 1743 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 05a6fef..30933e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -50,6 +50,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param cctx Cache context.
      * @param keyObj Key cache object.
      * @param valObj Cache object value.
+     * @param keepBinary Keep binary flag.
      */
     public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj, boolean keepBinary) {
         this.cctx = cctx;
@@ -61,6 +62,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
     /**
      * @param keyObj Key cache object.
      * @param val Value.
+     * @param keepBinary Keep binary flag.
      * @param cctx Cache context.
      */
     public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, V val, boolean keepBinary) {
@@ -75,6 +77,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param keyObj Key cache object.
      * @param key Key value.
      * @param valObj Cache object
+     * @param keepBinary Keep binary flag.
      * @param val Cache value.
      */
     public CacheLazyEntry(GridCacheContext<K, V> ctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index e875df0..5729959 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1729,10 +1729,10 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return Heap-based object.
      */
     @Nullable public <T> T unwrapTemporary(@Nullable Object obj) {
-        if (!offheapTiered())
+        if (!useOffheapEntry())
             return (T)obj;
 
-        return (T) cacheObjects().unwrapTemporary(this, obj);
+        return (T)cacheObjects().unwrapTemporary(this, obj);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ae40295..9336e0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.Cache;
@@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -1122,7 +1124,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             assert newVer != null : "Failed to get write version for tx: " + tx;
 
-            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+            old = (retval || intercept || lsnrCol != null) ?
+                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
 
             if (intercept) {
                 val0 = CU.value(val, cctx, false);
@@ -1206,10 +1214,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() ||
-                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
-                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
-                    partition(), tx.local(), false, updateCntr0, topVer);
+            if (lsnrCol != null) {
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    val,
+                    old,
+                    internal,
+                    partition(),
+                    tx.local(),
+                    false,
+                    updateCntr0,
+                    topVer);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
         }
@@ -1304,7 +1321,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
             newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
 
-            old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+
+            old = (retval || intercept || lsnrCol != null) ?
+                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             if (intercept) {
                 entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
@@ -1388,10 +1411,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     keepBinary);
             }
 
-            if (cctx.isLocal() || cctx.isReplicated() ||
-                (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())))
-                cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal()
-                    || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer);
+            if (lsnrCol != null) {
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    null,
+                    old,
+                    internal,
+                    partition(),
+                    tx.local(),
+                    false,
+                    updateCntr0,
+                    topVer);
+            }
 
             cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
 
@@ -1440,6 +1472,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return new GridCacheUpdateTxResult(false, null);
     }
 
+    /**
+     * @param tx Transaction.
+     * @return {@code True} if should notify continuous query manager.
+     */
+    private boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) {
+        return cctx.isLocal() ||
+            cctx.isReplicated() ||
+            (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()));
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal(
@@ -1470,7 +1512,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         EntryProcessorResult<Object> invokeRes = null;
 
         synchronized (this) {
-            boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter);
+            boolean internal = isInternal() || !context().userCache();
+
+            Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                cctx.continuousQueries().updateListeners(internal, false);
+
+            boolean needVal = retval ||
+                intercept ||
+                op == GridCacheOperation.TRANSFORM ||
+                !F.isEmpty(filter) ||
+                lsnrCol != null;
 
             checkObsolete();
 
@@ -1479,7 +1530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 unswap(retval);
 
             // Possibly get old value form store.
-            old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             boolean readFromStore = false;
 
@@ -1731,11 +1782,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (!isNear()) {
+            if (lsnrCol != null) {
                 long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE);
 
-                cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(),
-                    partition(), true, false, updateCntr, AffinityTopologyVersion.NONE);
+                cctx.continuousQueries().onEntryUpdated(
+                    lsnrCol,
+                    key,
+                    val,
+                    old,
+                    internal,
+                    partition(),
+                    true,
+                    false,
+                    updateCntr,
+                    AffinityTopologyVersion.NONE);
             }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
@@ -1997,8 +2057,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             if (updateCntr != null)
                                 updateCntr0 = updateCntr;
 
-                            cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal()
-                                || !context().userCache(), partition(), primary, false, updateCntr0, topVer);
+                            cctx.continuousQueries().onEntryUpdated(
+                                key,
+                                evtVal,
+                                prevVal,
+                                isInternal() || !context().userCache(),
+                                partition(),
+                                primary,
+                                false,
+                                updateCntr0,
+                                topVer);
                         }
 
                         return new GridCacheUpdateAtomicResult(false,
@@ -2019,7 +2087,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             // Prepare old value and value bytes.
-            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
+            oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 
             // Possibly read value from store.
             boolean readFromStore = false;
@@ -2937,7 +3005,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /**
      * @return {@code True} if values should be stored off-heap.
      */
-    protected boolean isOffHeapValuesOnly() {
+    protected final boolean isOffHeapValuesOnly() {
         return cctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES;
     }
 
@@ -3236,8 +3304,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 drReplicate(drType, val, ver);
 
                 if (!skipQryNtf) {
-                    cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal()
-                        || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer);
+                    cctx.continuousQueries().onEntryUpdated(
+                        key,
+                        val,
+                        null,
+                        this.isInternal() || !this.context().userCache(),
+                        this.partition(),
+                        true,
+                        preload,
+                        updateCntr,
+                        topVer);
 
                     cctx.dataStructures().onEntryUpdated(key, false, true);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0fef6f8..f091fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -822,10 +822,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         Object val = unmarshal(valPtr, !tmp);
 
-        if (val instanceof BinaryObjectOffheapImpl)
-            return (BinaryObjectOffheapImpl)val;
+        if (val instanceof CacheObject)
+            return (CacheObject)val;
 
-        return new CacheObjectImpl(val, null);
+        return toCacheObject(ctx.cacheObjectContext(), val, false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6c7bac5..fec61df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1992,6 +1993,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         boolean intercept = ctx.config().getInterceptor() != null;
 
+        boolean initLsnrs = false;
+        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+        boolean internal = false;
+
         // Avoid iterator creation.
         for (int i = 0; i < keys.size(); i++) {
             KeyCacheObject k = keys.get(i);
@@ -2006,6 +2011,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (entry == null)
                     continue;
 
+                if (!initLsnrs) {
+                    internal = entry.isInternal() || !context().userCache();
+
+                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+                    initLsnrs = true;
+                }
+
                 GridCacheVersion newConflictVer = req.conflictVersion(i);
                 long newConflictTtl = req.conflictTtl(i);
                 long newConflictExpireTime = req.conflictExpireTime(i);
@@ -2034,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     req.invokeArguments(),
                     primary && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
-                    sndPrevVal || req.returnValue(),
+                    lsnrs != null || sndPrevVal || req.returnValue(),
                     req.keepBinary(),
                     expiry,
                     true,
@@ -2061,6 +2074,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 }
 
                 if (dhtFut != null) {
+                    dhtFut.listeners(lsnrs);
+
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
 
@@ -2097,10 +2112,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (!entry.isNear() && updRes.success()) {
-                    ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(),
-                        entry.isInternal() || !context().userCache(), entry.partition(), primary, false,
-                        updRes.updateCounter(), topVer);
+                else if (lsnrs != null && updRes.success()) {
+                    ctx.continuousQueries().onEntryUpdated(
+                        lsnrs,
+                        entry.key(),
+                        updRes.newValue(),
+                        updRes.oldValue(),
+                        internal,
+                        entry.partition(),
+                        primary,
+                        false,
+                        updRes.updateCounter(),
+                        topVer);
                 }
 
                 if (hasNear) {
@@ -2275,6 +2298,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
             boolean intercept = ctx.config().getInterceptor() != null;
 
+            boolean initLsnrs = false;
+            Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+
             // Avoid iterator creation.
             for (int i = 0; i < entries.size(); i++) {
                 GridDhtCacheEntry entry = entries.get(i);
@@ -2308,6 +2334,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                     }
 
+                    if (!initLsnrs) {
+                        lsnrs = ctx.continuousQueries().updateListeners(
+                            entry.isInternal() || !context().userCache(),
+                            false);
+
+                        initLsnrs = true;
+                    }
+
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
                         node.id(),
@@ -2317,7 +2351,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         null,
                         /*write-through*/false,
                         /*read-through*/false,
-                        /*retval*/sndPrevVal,
+                        /*retval*/sndPrevVal || lsnrs != null,
                         req.keepBinary(),
                         expiry,
                         /*event*/true,
@@ -2366,6 +2400,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
+                        dhtFut.listeners(lsnrs);
+
                         EntryProcessor<Object, Object, Object> entryProcessor =
                             entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
 
@@ -2763,6 +2799,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
+        boolean initLsnrs = false;
+        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
+        boolean internal = false;
+
         for (int i = 0; i < req.size(); i++) {
             KeyCacheObject key = req.key(i);
 
@@ -2785,6 +2825,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         long ttl = req.ttl(i);
                         long expireTime = req.conflictExpireTime(i);
 
+                        if (!initLsnrs) {
+                            internal = entry.isInternal() || !context().userCache();
+
+                            lsnrs = ctx.continuousQueries().updateListeners(internal, false);
+
+                            initLsnrs = true;
+                        }
+
                         GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                             ver,
                             nodeId,
@@ -2794,7 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             op == TRANSFORM ? req.invokeArguments() : null,
                             /*write-through*/false,
                             /*read-through*/false,
-                            /*retval*/false,
+                            /*retval*/lsnrs != null,
                             req.keepBinary(),
                             /*expiry policy*/null,
                             /*event*/true,
@@ -2817,10 +2865,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (updRes.success() && !entry.isNear())
-                            ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(),
-                                updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(),
-                                false, false, updRes.updateCounter(), req.topologyVersion());
+                        if (lsnrs != null && updRes.success()) {
+                            ctx.continuousQueries().onEntryUpdated(
+                                lsnrs,
+                                entry.key(),
+                                updRes.newValue(),
+                                updRes.oldValue(),
+                                internal,
+                                entry.partition(),
+                                false,
+                                false,
+                                updRes.updateCounter(),
+                                req.topologyVersion());
+                        }
 
                         entry.onUnlock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 06c8441..58d704d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -102,6 +103,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Response count. */
     private volatile int resCnt;
 
+    /** */
+    private Map<UUID, CacheContinuousQueryListener> lsnrs;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -136,6 +140,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !topLocked;
     }
 
+    /**
+     * @param lsnrs Continuous query listeners.
+     */
+    void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) {
+        this.lsnrs = lsnrs;
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteUuid futureId() {
         return futVer.asGridUuid();
@@ -215,6 +226,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param ttl TTL (optional).
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} sends previous value to backups.
+     * @param prevVal Previous value.
      * @param updateCntr Partition update counter.
      */
     public void addWriteEntry(GridDhtCacheEntry entry,
@@ -270,13 +283,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     addPrevVal,
                     entry.partition(),
                     prevVal,
-                    updateCntr);
+                    updateCntr,
+                    lsnrs != null);
             }
-            else if (dhtNodes.size() == 1) {
+            else if (lsnrs != null && dhtNodes.size() == 1) {
                 try {
-                    cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal,
-                        entry.key().internal() || !cctx.userCache(), entry.partition(), true, false,
-                        updateCntr, updateReq.topologyVersion());
+                    cctx.continuousQueries().onEntryUpdated(
+                        lsnrs,
+                        entry.key(),
+                        val,
+                        prevVal,
+                        entry.key().internal() || !cctx.userCache(),
+                        entry.partition(),
+                        true,
+                        false,
+                        updateCntr,
+                        updateReq.topologyVersion());
                 }
                 catch (IgniteCheckedException e) {
                     U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal="
@@ -352,7 +374,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                if (!mappings.isEmpty()) {
+                if (!mappings.isEmpty() && lsnrs != null) {
                     Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
 
                     exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
@@ -362,7 +384,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                             if (!hndKeys.contains(key)) {
                                 updateRes.addFailedKey(key, err);
 
-                                cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i),
+                                cctx.continuousQueries().skipUpdateEvent(
+                                    lsnrs,
+                                    key,
+                                    req.partitionId(i),
+                                    req.updateCounter(i),
                                     updateReq.topologyVersion());
 
                                 hndKeys.add(key);
@@ -378,27 +404,38 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         updateRes.addFailedKey(key, err);
             }
             else {
-                Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+                if (lsnrs != null) {
+                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
 
-                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                    for (int i = 0; i < req.size(); i++) {
-                        KeyCacheObject key = req.key(i);
+                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                        for (int i = 0; i < req.size(); i++) {
+                            KeyCacheObject key = req.key(i);
 
-                        if (!hndKeys.contains(key)) {
-                            try {
-                                cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i),
-                                    key.internal() || !cctx.userCache(), req.partitionId(i), true, false,
-                                    req.updateCounter(i), updateReq.topologyVersion());
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal="
-                                    + req.value(i) + ", err=" + e + "]");
-                            }
+                            if (!hndKeys.contains(key)) {
+                                try {
+                                    cctx.continuousQueries().onEntryUpdated(
+                                        lsnrs,
+                                        key,
+                                        req.value(i),
+                                        req.localPreviousValue(i),
+                                        key.internal() || !cctx.userCache(),
+                                        req.partitionId(i),
+                                        true,
+                                        false,
+                                        req.updateCounter(i),
+                                        updateReq.topologyVersion());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    U.warn(log, "Failed to send continuous query message. [key=" + key +
+                                        ", newVal=" + req.value(i) +
+                                        ", err=" + e + "]");
+                                }
 
-                            hndKeys.add(key);
+                                hndKeys.add(key);
 
-                            if (hndKeys.size() == keys.size())
-                                break exit;
+                                if (hndKeys.size() == keys.size())
+                                    break exit;
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7cc276f..e417cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -49,6 +50,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Lite dht cache backup update request.
  */
+@IgniteCodeGeneratingFail // Need add 'cleanup' call in 'writeTo' method.
 public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -215,7 +217,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         keys = new ArrayList<>();
         partIds = new ArrayList<>();
-        locPrevVals = new ArrayList<>();
 
         if (forceTransformBackups) {
             entryProcessors = new ArrayList<>();
@@ -240,7 +241,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @param conflictExpireTime Conflict expire time (optional).
      * @param conflictVer Conflict version (optional).
      * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
      * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     * @param storeLocPrevVal If {@code true} stores previous value.
      */
     public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
@@ -251,12 +255,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         boolean addPrevVal,
         int partId,
         @Nullable CacheObject prevVal,
-        @Nullable Long updateIdx) {
+        @Nullable Long updateCntr,
+        boolean storeLocPrevVal) {
         keys.add(key);
 
         partIds.add(partId);
 
-        locPrevVals.add(prevVal);
+        if (storeLocPrevVal) {
+            if (locPrevVals == null)
+                locPrevVals = new ArrayList<>();
+
+            locPrevVals.add(prevVal);
+        }
 
         if (forceTransformBackups) {
             assert entryProcessor != null;
@@ -273,11 +283,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             prevVals.add(prevVal);
         }
 
-        if (updateIdx != null) {
+        if (updateCntr != null) {
             if (updateCntrs == null)
                 updateCntrs = new GridLongList();
 
-            updateCntrs.add(updateIdx);
+            updateCntrs.add(updateCntr);
         }
 
         // In case there is no conflict, do not create the list.
@@ -521,6 +531,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
      * @return Value.
      */
     @Nullable public CacheObject localPreviousValue(int idx) {
+        assert locPrevVals != null;
+
         return locPrevVals.get(idx);
     }
 
@@ -849,6 +861,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
         }
 
+        cleanup();
+
         return true;
     }
 
@@ -1048,6 +1062,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
     }
 
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    private void cleanup() {
+        nearVals = null;
+        prevVals = null;
+
+        // Do not keep values if they are not needed for continuous query notification.
+        if (locPrevVals == null) {
+           vals = null;
+           locPrevVals = null;
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public byte directType() {
         return 38;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 8f0cab7..0d8f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1107,7 +1107,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 next = null;
 
                 while (it.hasNext()) {
-                    final LazySwapEntry e = new LazySwapEntry(it.next(), keepBinary);
+                    final LazySwapEntry e = new LazySwapEntry(it.next());
 
                     if (filter != null) {
                         K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary);
@@ -2524,15 +2524,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         /** */
         private final Map.Entry<byte[], byte[]> e;
 
-        /** */
-        private boolean keepBinary;
-
         /**
          * @param e Entry with
          */
-        LazySwapEntry(Map.Entry<byte[], byte[]> e, boolean keepBinary) {
+        LazySwapEntry(Map.Entry<byte[], byte[]> e) {
             this.e = e;
-            this.keepBinary = keepBinary;
         }
 
         /** {@inheritDoc} */
@@ -2545,9 +2541,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override protected V unmarshalValue() throws IgniteCheckedException {
             IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue());
 
-            CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
-
-            return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(obj, keepBinary);
+            return (V)cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1());
         }
 
         /** {@inheritDoc} */
@@ -2597,13 +2591,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         @Override protected V unmarshalValue() throws IgniteCheckedException {
             long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2());
 
-            CacheObject obj = cctx.fromOffheap(ptr, false);
-
-            V val = CU.value(obj, cctx, false);
-
-            assert val != null;
-
-            return val;
+            return (V)cctx.fromOffheap(ptr, false);
         }
 
         /** {@inheritDoc} */
@@ -2661,7 +2649,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             if (!filter.apply(key, val))
                 return null;
 
-            return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value()));
+            if (key instanceof CacheObject)
+                ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext());
+
+            val = (V)cctx.unwrapTemporary(e.value());
+
+            if (val instanceof CacheObject)
+                ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext());
+
+            return new IgniteBiTuple<>(e.key(), val);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7e66ad3..cf9b439 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -882,8 +882,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @return Continuous query entry.
          */
         private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
-            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
+            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) {
                 e.markFiltered();
 
                 return e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..dce04de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 /**
  * Continuous query listener.
  */
-interface CacheContinuousQueryListener<K, V> {
+public interface CacheContinuousQueryListener<K, V> {
     /**
      * Query execution callback.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 0e4cb40..cc59989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.LoggerResource;
+import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static javax.cache.event.EventType.CREATED;
@@ -155,37 +156,102 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param lsnrs Listeners to notify.
+     * @param key Entry key.
      * @param partId Partition id.
      * @param updCntr Updated counter.
      * @param topVer Topology version.
      */
-    public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
-        if (lsnrCnt.get() > 0) {
-            for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
-                CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                    cctx.cacheId(),
-                    UPDATED,
-                    key,
-                    null,
-                    null,
-                    lsnr.keepBinary(),
-                    partId,
-                    updCntr,
-                    topVer);
+    public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
+        KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+        assert lsnrs != null;
 
-                CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
-                    cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+        for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                UPDATED,
+                key,
+                null,
+                null,
+                lsnr.keepBinary(),
+                partId,
+                updCntr,
+                topVer);
 
-                lsnr.skipUpdateEvent(evt, topVer);
-            }
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+            lsnr.skipUpdateEvent(evt, topVer);
         }
     }
 
     /**
+     * @param internal Internal entry flag (internal key or not user cache).
+     * @param preload Whether update happened during preloading.
+     * @return Registered listeners.
+     */
+    @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners(
+        boolean internal,
+        boolean preload) {
+        if (preload && !internal)
+            return null;
+
+        ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
+
+        if (internal)
+            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
+        else
+            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
+
+        return F.isEmpty(lsnrCol) ? null : lsnrCol;
+    }
+
+    /**
+     * @param key Key.
+     * @param newVal New value.
+     * @param oldVal Old value.
+     * @param internal Internal entry (internal key or not user cache).
+     * @param partId Partition.
+     * @param primary {@code True} if called on primary node.
+     * @param preload Whether update happened during preloading.
+     * @param updateCntr Update counter.
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public void onEntryUpdated(
+        KeyCacheObject key,
+        CacheObject newVal,
+        CacheObject oldVal,
+        boolean internal,
+        int partId,
+        boolean primary,
+        boolean preload,
+        long updateCntr,
+        AffinityTopologyVersion topVer) throws IgniteCheckedException {
+        Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
+
+        if (lsnrCol != null) {
+            onEntryUpdated(
+                lsnrCol,
+                key,
+                newVal,
+                oldVal,
+                internal,
+                partId,
+                primary,
+                preload,
+                updateCntr,
+                topVer);
+        }
+    }
+
+    /**
+     * @param lsnrCol Listeners to notify.
      * @param key Key.
      * @param newVal New value.
      * @param oldVal Old value.
      * @param internal Internal entry (internal key or not user cache),
+     * @param partId Partition.
      * @param primary {@code True} if called on primary node.
      * @param preload Whether update happened during preloading.
      * @param updateCntr Update counter.
@@ -193,6 +259,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void onEntryUpdated(
+        Map<UUID, CacheContinuousQueryListener> lsnrCol,
         KeyCacheObject key,
         CacheObject newVal,
         CacheObject oldVal,
@@ -205,25 +272,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         throws IgniteCheckedException
     {
         assert key != null;
-
-        if (preload && !internal)
-            return;
-
-        ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
-
-        if (internal)
-            lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null;
-        else
-            lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null;
-
-        if (F.isEmpty(lsnrCol))
-            return;
+        assert lsnrCol != null;
 
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
-        if (!hasNewVal && !hasOldVal)
+        if (!hasNewVal && !hasOldVal) {
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer);
+
             return;
+        }
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 7c7e3e3..0218897 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
-            Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
-                .context().topology().updateCounters();
+        if (hnd.isQuery()) {
+            GridCacheProcessor proc = ctx.cache();
 
-            req.addUpdateCounters(cntrs);
+            if (proc != null) {
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+
+                if (cache != null && !cache.isLocal()) {
+                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
+
+                    req.addUpdateCounters(cntrs);
+                }
+            }
         }
 
         if (err != null)


Mime
View raw message