ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2515 Fixed review notes.
Date Tue, 16 Feb 2016 09:24:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2515 0295a56e1 -> f950888d9


IGNITE-2515 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f950888d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f950888d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f950888d

Branch: refs/heads/ignite-2515
Commit: f950888d9e9f78ef25e96c683494a9615c8b1053
Parents: 0295a56
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Tue Feb 16 12:24:37 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Tue Feb 16 12:24:37 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 71 +++++++-------------
 .../CacheContinuousQueryListener.java           |  4 +-
 .../continuous/CacheContinuousQueryManager.java | 15 ++---
 3 files changed, 30 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f950888d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index cf622a0..dd9e6e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -368,55 +368,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                                     if (!internal && !skipPrimaryCheck)
                                         sendBackupAcknowledge(ackBuf.onAcknowledged(entry),
routineId, ctx);
-
-                                    if (recordIgniteEvt) {
-                                        for (CacheEntryEvent<? extends K, ? extends V>
e : evts) {
-                                            ctx.event().record(new CacheQueryReadEvent<>(
-                                                ctx.discovery().localNode(),
-                                                "Continuous query executed.",
-                                                EVT_CACHE_QUERY_OBJECT_READ,
-                                                CacheQueryType.CONTINUOUS.name(),
-                                                cacheName,
-                                                null,
-                                                null,
-                                                null,
-                                                rmtFilter,
-                                                null,
-                                                nodeId,
-                                                taskName(),
-                                                e.getKey(),
-                                                e.getValue(),
-                                                e.getOldValue(),
-                                                null
-                                            ));
-                                        }
-                                    }
                                 }
                             }
                             else {
-                                if (!entry.isFiltered()) {
+                                if (!entry.isFiltered())
                                     locLsnr.onUpdated(F.<CacheEntryEvent<? extends
K, ? extends V>>asList(evt));
-
-                                    if (recordIgniteEvt)
-                                        ctx.event().record(new CacheQueryReadEvent<>(
-                                            ctx.discovery().localNode(),
-                                            "Continuous query executed.",
-                                            EVT_CACHE_QUERY_OBJECT_READ,
-                                            CacheQueryType.CONTINUOUS.name(),
-                                            cacheName,
-                                            null,
-                                            null,
-                                            null,
-                                            rmtFilter,
-                                            null,
-                                            nodeId,
-                                            taskName(),
-                                            evt.getKey(),
-                                            evt.getValue(),
-                                            evt.getOldValue(),
-                                            null
-                                        ));
-                                }
                             }
                         }
                         else {
@@ -450,6 +406,27 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 catch (IgniteCheckedException ex) {
                     U.error(ctx.log(getClass()), "Failed to send event notification to node:
" + nodeId, ex);
                 }
+
+                if (recordIgniteEvt && notify) {
+                    ctx.event().record(new CacheQueryReadEvent<>(
+                        ctx.discovery().localNode(),
+                        "Continuous query executed.",
+                        EVT_CACHE_QUERY_OBJECT_READ,
+                        CacheQueryType.CONTINUOUS.name(),
+                        cacheName,
+                        null,
+                        null,
+                        null,
+                        rmtFilter,
+                        null,
+                        nodeId,
+                        taskName(),
+                        evt.getKey(),
+                        evt.getValue(),
+                        evt.getOldValue(),
+                        null
+                    ));
+                }
             }
 
             @Override public void onUnregister() {
@@ -496,14 +473,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
 
             @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
AffinityTopologyVersion topVer,
-                boolean primary, boolean recordIgniteEvt) {
+                boolean primary) {
                 assert evt != null;
 
                 CacheContinuousQueryEntry e = evt.entry();
 
                 e.markFiltered();
 
-                onEntryUpdated(evt, primary, recordIgniteEvt);
+                onEntryUpdated(evt, primary, false);
             }
 
             @Override public void onPartitionEvicted(int part) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f950888d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 6a5c7e6..83ff32c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -68,10 +68,8 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param evt Event
      * @param topVer Topology version.
      * @param primary Primary
-     * @param recordIgniteEvt Whether to record event.
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion
topVer,
-        boolean primary, boolean recordIgniteEvt);
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion
topVer, boolean primary);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f950888d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index d2150f8..b36fd56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -167,10 +167,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         int partId,
         long updCntr,
         AffinityTopologyVersion topVer) {
-        boolean recordIgniteEvt = !(key.internal() || !cctx.userCache())
-            && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
-        skipUpdateEvent(lsnrs, key, partId, updCntr, true, recordIgniteEvt, topVer);
+        skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer);
     }
 
     /**
@@ -180,14 +177,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param updCntr Updated counter.
      * @param topVer Topology version.
      * @param primary Primary.
-     * @param recordIgniteEvt Whether to record event.
      */
     public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs,
         KeyCacheObject key,
         int partId,
         long updCntr,
         boolean primary,
-        boolean recordIgniteEvt,
         AffinityTopologyVersion topVer) {
         assert lsnrs != null;
 
@@ -206,7 +201,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            lsnr.skipUpdateEvent(evt, topVer, primary, recordIgniteEvt);
+            lsnr.skipUpdateEvent(evt, topVer, primary);
         }
     }
 
@@ -302,14 +297,14 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
-        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
-
         if (!hasNewVal && !hasOldVal) {
-            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, recordIgniteEvt, topVer);
+            skipUpdateEvent(lsnrCol, key, partId, updateCntr, primary, topVer);
 
             return;
         }
 
+        boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 
         boolean initialized = false;


Mime
View raw message