ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/25] ignite git commit: ignite-5075
Date Thu, 01 Jun 2017 15:44:14 GMT
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/06989645
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/06989645
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/06989645

Branch: refs/heads/ignite-5075-pds
Commit: 06989645957dd958f0736ab41a21af3b92580f04
Parents: 4ef7601
Author: sboikov <sboikov@gridgain.com>
Authored: Wed May 31 16:26:18 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 31 17:45:11 2017 +0300

----------------------------------------------------------------------
 .../processors/affinity/GridAffinityUtils.java  |  2 +-
 .../cache/CacheAffinitySharedManager.java       |  5 +++
 .../CacheClientReconnectDiscoveryData.java      |  8 ++--
 .../internal/processors/cache/CacheData.java    |  4 +-
 .../processors/cache/CacheGroupData.java        |  7 ++-
 .../cache/CacheGroupInfrastructure.java         | 18 ++++----
 .../cache/CacheJoinNodeDiscoveryData.java       |  4 +-
 .../processors/cache/ClusterCachesInfo.java     | 11 ++---
 .../processors/cache/ExchangeActions.java       |  2 +-
 .../cache/GridCacheConcurrentMapImpl.java       |  1 +
 .../processors/cache/GridCacheContext.java      |  1 +
 .../processors/cache/GridCacheEntryInfo.java    |  1 +
 .../processors/cache/GridCacheMapEntry.java     |  2 +
 .../processors/cache/GridCacheProcessor.java    |  4 +-
 .../cache/database/CacheDataRowAdapter.java     |  1 +
 .../continuous/CacheContinuousQueryHandler.java | 46 ++++++++------------
 .../CacheContinuousQueryListener.java           |  5 ++-
 .../query/continuous/CounterSkipContext.java    | 14 +++---
 18 files changed, 74 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index 35fe965..abd5292 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -187,7 +187,7 @@ class GridAffinityUtils {
                 new GridAffinityAssignment(topVer, assign0.assignment(), assign0.idealAssignment());
 
             return F.t(
-                affinityMessage(ctx, cctx.group().affinityFunction()),
+                affinityMessage(ctx, cctx.config().getAffinity()),
                 affinityMessage(ctx, cctx.config().getAffinityMapper()),
                 assign);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index d3be472..1c71dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -515,6 +515,11 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         return exchActions.clientOnlyExchange();
     }
 
+    /**
+     * @param cacheId Cache ID.
+     * @param closeReqs Close requests.
+     * @return {@code True} if requests contain request for given cache ID.
+     */
     private boolean cacheClosed(int cacheId, List<ExchangeActions.ActionData> closeReqs)
{
         for (ExchangeActions.ActionData req : closeReqs) {
             if (req.descriptor().cacheId() == cacheId)

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
index 2b22146..6a6f40d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClientReconnectDiscoveryData.java
@@ -74,7 +74,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
         private final IgniteUuid deploymentId;
 
         /** Flags added for future usage. */
-        private final byte flags;
+        private final long flags;
 
         /**
          * @param ccfg Cache group configuration.
@@ -83,7 +83,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable {
          */
         CacheGroupInfo(CacheConfiguration ccfg,
             IgniteUuid deploymentId,
-            byte flags) {
+            long flags) {
             assert ccfg != null;
             assert deploymentId != null;
 
@@ -127,7 +127,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable
{
         private final boolean nearCache;
 
         /** Flags added for future usage. */
-        private final byte flags;
+        private final long flags;
 
         /**
          * @param ccfg Cache configuration.
@@ -140,7 +140,7 @@ public class CacheClientReconnectDiscoveryData implements Serializable
{
             CacheType cacheType,
             IgniteUuid deploymentId,
             boolean nearCache,
-            byte flags) {
+            long flags) {
             assert ccfg != null;
             assert cacheType != null;
             assert deploymentId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
index 75fa580..b728d96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheData.java
@@ -62,7 +62,7 @@ public class CacheData implements Serializable {
     private final boolean template;
 
     /** Flags added for future usage. */
-    private final byte flags;
+    private final long flags;
 
     /**
      * @param cacheCfg Cache configuration.
@@ -87,7 +87,7 @@ public class CacheData implements Serializable {
         boolean staticCfg,
         boolean sql,
         boolean template,
-        byte flags) {
+        long flags) {
         assert cacheCfg != null;
         assert rcvdFrom != null : cacheCfg.getName();
         assert deploymentId != null : cacheCfg.getName();

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
index 0b94782..a290caf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java
@@ -56,6 +56,9 @@ public class CacheGroupData implements Serializable {
     @GridToStringInclude
     private final Map<String, Integer> caches;
 
+    /** */
+    private long flags;
+
     /**
      * @param cacheCfg Cache configuration.
      * @param grpName Group name.
@@ -72,7 +75,8 @@ public class CacheGroupData implements Serializable {
         UUID rcvdFrom,
         @Nullable AffinityTopologyVersion startTopVer,
         IgniteUuid deploymentId,
-        Map<String, Integer> caches) {
+        Map<String, Integer> caches,
+        long flags) {
         assert cacheCfg != null;
         assert grpId != 0;
         assert deploymentId != null;
@@ -84,6 +88,7 @@ public class CacheGroupData implements Serializable {
         this.startTopVer = startTopVer;
         this.deploymentId = deploymentId;
         this.caches = caches;
+        this.flags = flags;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index b7d8243..5278e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -73,6 +73,9 @@ public class CacheGroupInfrastructure {
     /** Node ID cache group was received from. */
     private final UUID rcvdFrom;
 
+    /** Flag indicating that this cache group is in a recovery mode due to partitions loss.
*/
+    private boolean needsRecovery;
+
     /** */
     private final AffinityTopologyVersion locStartVer;
 
@@ -88,7 +91,7 @@ public class CacheGroupInfrastructure {
     /** */
     private final CacheType cacheType;
 
-    /** IO policy. */
+    /** */
     private final byte ioPlc;
 
     /** */
@@ -97,9 +100,6 @@ public class CacheGroupInfrastructure {
     /** */
     private final boolean storeCacheId;
 
-    /** Flag indicating that this cache group is in a recovery mode due to partitions loss.
*/
-    private boolean needsRecovery;
-
     /** */
     private volatile List<GridCacheContext> caches;
 
@@ -127,10 +127,10 @@ public class CacheGroupInfrastructure {
     /** */
     private final CacheObjectContext cacheObjCtx;
 
-    /** FreeList instance this group is associated with. */
+    /** */
     private final FreeList freeList;
 
-    /** ReuseList instance this group is associated with */
+    /** */
     private final ReuseList reuseList;
 
     /** */
@@ -799,12 +799,12 @@ public class CacheGroupInfrastructure {
                 skipCtx = cctx.continuousQueries().skipUpdateCounter(skipCtx, part, cntr,
topVer, primary);
         }
 
-        final List<Runnable> sndC = skipCtx != null ? skipCtx.sendClosures() : null;
+        final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() :
null;
 
-        if (sndC != null) {
+        if (procC != null) {
             ctx.kernalContext().closure().runLocalSafe(new Runnable() {
                 @Override public void run() {
-                    for (Runnable c : sndC)
+                    for (Runnable c : procC)
                         c.run();
                 }
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
index afc01c9..58c9d82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheJoinNodeDiscoveryData.java
@@ -111,7 +111,7 @@ class CacheJoinNodeDiscoveryData implements Serializable {
         private final boolean sql;
 
         /** Flags added for future usage. */
-        private final byte flags;
+        private final long flags;
 
         /**
          * @param ccfg Cache configuration.
@@ -119,7 +119,7 @@ class CacheJoinNodeDiscoveryData implements Serializable {
          * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
          * @param flags Flags (for future usage).
          */
-        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, byte flags)
{
+        CacheInfo(CacheConfiguration ccfg, CacheType cacheType, boolean sql, long flags)
{
             this.ccfg = ccfg;
             this.cacheType = cacheType;
             this.sql = sql;

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
index cd79673..e3437bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java
@@ -535,7 +535,7 @@ class ClusterCachesInfo {
 
                 cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(),
                     desc.deploymentId(),
-                    (byte)0));
+                    0));
             }
 
             for (IgniteInternalCache cache : ctx.cache().caches()) {
@@ -547,7 +547,7 @@ class ClusterCachesInfo {
                     desc.cacheType(),
                     desc.deploymentId(),
                     cache.context().isNear(),
-                    (byte)0));
+                    0));
             }
 
             return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo);
@@ -662,7 +662,8 @@ class ClusterCachesInfo {
                 grpDesc.receivedFrom(),
                 grpDesc.startTopologyVersion(),
                 grpDesc.deploymentId(),
-                grpDesc.caches());
+                grpDesc.caches(),
+                0);
 
             cacheGrps.put(grpDesc.groupId(), grpData);
         }
@@ -680,7 +681,7 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 desc.sql(),
                 false,
-                (byte)0);
+                0);
 
             caches.put(desc.cacheName(), cacheData);
         }
@@ -698,7 +699,7 @@ class ClusterCachesInfo {
                 desc.staticallyConfigured(),
                 false,
                 true,
-                (byte)0);
+                0);
 
             templates.put(desc.cacheName(), cacheData);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index bbcb6ec..d577b30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -64,7 +64,7 @@ public class ExchangeActions {
         return F.isEmpty(cachesToStart) &&
             F.isEmpty(cachesToStop) &&
             F.isEmpty(cacheGrpsToStart) &&
-            F.isEmpty(cacheGroupsToStop()) &&
+            F.isEmpty(cacheGrpsToStop) &&
             F.isEmpty(cachesToResetLostParts);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index cd2d812..12d9980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -233,6 +233,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM
 
     /**
      * @param sizeChange Size delta.
+     * @param hld Map holder.
      * @param e Map entry.
      */
     protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index ed1fdcb..50e0121 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -264,6 +264,7 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @param cacheCfg Cache configuration.
      * @param grp Cache group.
      * @param cacheType Cache type.
+     * @param locStartTopVer Topology version when cache was started on local node.
      * @param affNode {@code True} if local node is affinity node.
      * @param updatesAllowed Updates allowed flag.
      * @param evtMgr Cache event manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 968d65d..7371153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -319,6 +319,7 @@ public class GridCacheEntryInfo implements Message {
     /**
      * @param ctx Cache object context.
      * @return Marshalled size.
+     * @throws IgniteCheckedException If failed.
      */
     public int marshalledSize(CacheObjectContext ctx) throws IgniteCheckedException {
         int size = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 8a82187..01677a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2670,6 +2670,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
     /**
      * @param topVer Topology version for current operation.
+     * @param primary Primary node update flag.
+     * @param primaryCntr Counter assigned on primary node.
      * @return Update counter.
      */
     protected long nextPartitionCounter(AffinityTopologyVersion topVer, boolean primary,
@Nullable Long primaryCntr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f9b5655..cac8420 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -730,10 +730,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             else
                 stopSeq.addFirst(cfg.getName());
 
-            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType,
sql, (byte)0));
+            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, cacheType,
sql, 0));
         }
         else
-            templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER,
false, (byte)0));
+            templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cfg, CacheType.USER,
false, 0));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 50ba207..f486601 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -131,6 +131,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
         do {
             final long pageId = pageId(nextLink);
 
+            // Group is null if try evict page, with persistence evictions should be disabled.
             assert grp != null || !sharedCtx.database().persistenceEnabled();
 
             int grpId = grp != null ? grp.groupId() : 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 149bd69..2b696a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -510,7 +510,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             }, part);
                         }
                         else
-                            skipCtx.addSendClosure(new Runnable() {
+                            skipCtx.addProcessClosure(new Runnable() {
                                 @Override public void run() {
                                     locLsnr.onUpdated(evts);
                                 }
@@ -525,7 +525,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
 
                 if (entryOrList != null) {
-                    skipCtx.addSendClosure(new Runnable() {
+                    skipCtx.addProcessClosure(new Runnable() {
                         @Override public void run() {
                             try {
                                 ctx.continuous().addNotification(nodeId,
@@ -808,27 +808,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /**
-     * @param evt Event.
-     */
-    private void handleLocalListener(CacheContinuousQueryEvent evt) {
-        CacheContinuousQueryEntry entry = evt.entry();
-
-        if (!locCache) {
-            Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx,
entry);
-
-            if (!evts.isEmpty())
-                locLsnr.onUpdated(evts);
-
-            if (!internal && !skipPrimaryCheck)
-                sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
-        }
-        else {
-            if (!entry.isFiltered())
-                locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-        }
-    }
-
-    /**
      * @param evt Continuous query event.
      * @param notify Notify flag.
      * @param loc Listener deployed on this node.
@@ -841,11 +820,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             if (cctx == null)
                 return;
 
-            if (loc)
-                handleLocalListener(evt);
-            else {
-                CacheContinuousQueryEntry entry = evt.entry();
+            final CacheContinuousQueryEntry entry = evt.entry();
+
+            if (loc) {
+                if (!locCache) {
+                    Collection<CacheEntryEvent<? extends K, ? extends V>> evts
= handleEvent(ctx, entry);
 
+                    if (!evts.isEmpty())
+                        locLsnr.onUpdated(evts);
+
+                    if (!internal && !skipPrimaryCheck)
+                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                }
+                else {
+                    if (!entry.isFiltered())
+                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends
V>>asList(evt));
+                }
+            }
+            else {
                 if (!entry.isFiltered())
                     prepareEntry(cctx, nodeId, entry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 5e73840..7da657f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -77,6 +77,9 @@ public interface CacheContinuousQueryListener<K, V> {
     public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion
topVer, boolean primary);
 
     /**
+     * For cache updates in shared cache group need notify others caches CQ listeners
+     * that generated counter should be skipped.
+     *
      * @param cctx Cache context.
      * @param skipCtx Context.
      * @param part Partition.
@@ -84,7 +87,7 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param topVer Topology version.
      * @return Context.
      */
-    public CounterSkipContext skipUpdateCounter(
+    @Nullable public CounterSkipContext skipUpdateCounter(
         GridCacheContext cctx,
         @Nullable CounterSkipContext skipCtx,
         int part,

http://git-wip-us.apache.org/repos/asf/ignite/blob/06989645/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
index 747d7d6..23702bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CounterSkipContext.java
@@ -30,7 +30,7 @@ public class CounterSkipContext {
     private final CacheContinuousQueryEntry entry;
 
     /** */
-    private List<Runnable> sndC;
+    private List<Runnable> procC;
 
     /**
      * @param part Partition.
@@ -62,17 +62,17 @@ public class CounterSkipContext {
     /**
      * @return Entries
      */
-    @Nullable public List<Runnable> sendClosures() {
-        return sndC;
+    @Nullable public List<Runnable> processClosures() {
+        return procC;
     }
 
     /**
      * @param c Closure send
      */
-    void addSendClosure(Runnable c) {
-        if (sndC == null)
-            sndC = new ArrayList<>();
+    void addProcessClosure(Runnable c) {
+        if (procC == null)
+            procC = new ArrayList<>();
 
-        sndC.add(c);
+        procC.add(c);
     }
 }


Mime
View raw message