ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [06/50] [abbrv] ignite git commit: Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry). (cherry picked from commit dee6190)
Date Mon, 21 Mar 2016 14:20:31 GMT
Continuous query compatibility fix (topVer can be null for old CacheContinuousQueryEntry).
(cherry picked from commit dee6190)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3da257f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3da257f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3da257f2

Branch: refs/heads/ignite-2801
Commit: 3da257f2d2f191649d0213524644d9e67b03d8d2
Parents: aad672b
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Feb 25 13:54:11 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Feb 26 15:23:41 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 43 +++++++++++---------
 1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3da257f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4397f69..1938edb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -737,6 +737,12 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry
entry) {
             assert entry != null;
 
+            if (entry.topologyVersion() == null) { // Possible if entry is sent from old
node.
+                assert entry.updateCounter() == 0L : entry;
+
+                return F.asList(entry);
+            }
+
             List<CacheContinuousQueryEntry> entries;
 
             synchronized (pendingEvts) {
@@ -991,28 +997,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         routineId,
                         t.get1());
 
-                    Collection<ClusterNode> nodes = new HashSet<>();
-
-                    for (AffinityTopologyVersion topVer : t.get2())
-                        nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer));
-
-                    for (ClusterNode node : nodes) {
-                        if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER)
>= 0) {
-                            try {
-                                cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
-                            }
-                            catch (ClusterTopologyCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
+                    for (AffinityTopologyVersion topVer : t.get2()) {
+                        for (ClusterNode node : ctx.discovery().cacheAffinityNodes(cctx.name(),
topVer)) {
+                            if (!node.isLocal() && node.version().compareTo(CacheContinuousQueryBatchAck.SINCE_VER)
>= 0) {
+                                try {
+                                    cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+                                }
+                                catch (ClusterTopologyCheckedException e) {
+                                    IgniteLogger log = ctx.log(getClass());
 
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to send acknowledge message, node left
" +
-                                        "[msg=" + msg + ", node=" + node + ']');
-                            }
-                            catch (IgniteCheckedException e) {
-                                IgniteLogger log = ctx.log(getClass());
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send acknowledge message, node
left " +
+                                            "[msg=" + msg + ", node=" + node + ']');
+                                }
+                                catch (IgniteCheckedException e) {
+                                    IgniteLogger log = ctx.log(getClass());
 
-                                U.error(log, "Failed to send acknowledge message " +
-                                    "[msg=" + msg + ", node=" + node + ']', e);
+                                    U.error(log, "Failed to send acknowledge message " +
+                                        "[msg=" + msg + ", node=" + node + ']', e);
+                                }
                             }
                         }
                     }


Mime
View raw message