ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-2791 Fixed review notes.
Date Mon, 21 Mar 2016 14:54:29 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2791 4731a6bb4 -> 01d470f68


IGNITE-2791 Fixed review notes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01d470f6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01d470f6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01d470f6

Branch: refs/heads/ignite-2791
Commit: 01d470f6808ede93b00c7a32929ba871006e6b91
Parents: 4731a6b
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Mar 21 17:54:09 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Mar 21 17:54:09 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |  3 +-
 .../internal/GridMessageListenHandler.java      |  3 +-
 .../continuous/CacheContinuousQueryHandler.java | 40 ++++++++++++++------
 .../continuous/CacheContinuousQueryManager.java |  2 +
 .../continuous/GridContinuousHandler.java       |  4 +-
 .../continuous/GridContinuousProcessor.java     |  9 +++--
 6 files changed, 42 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index bc43195..19bf1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 089091b..0ac6877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index e794f46..6243af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -148,7 +148,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrs;
+    private transient volatile Map<Integer, Long> initUpdCntrs;
+
+    /** */
+    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
 
     /** */
     private transient volatile AffinityTopologyVersion initTopVer;
@@ -266,7 +269,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
+        this.initUpdCntrsPerNode = cntrsPerNode;
         this.initUpdCntrs = cntrs;
         this.initTopVer = topVer;
     }
@@ -553,10 +558,14 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * Wait topology.
      */
     public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
-        cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-        for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
-            getOrCreatePartitionRecovery(ctx, partId);
+        if (!cctx.isLocal()) {
+            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+            for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
+                getOrCreatePartitionRecovery(ctx, partId);
+        }
     }
 
     /** {@inheritDoc} */
@@ -684,18 +693,25 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
             AffinityTopologyVersion initTopVer0 = initTopVer;
 
-            if (initTopVer0 != null && !initTopVer0.equals(AffinityTopologyVersion.NONE))
{
-                GridCacheAffinityManager aff = cacheContext(ctx).affinity();
+            if (initTopVer0 != null) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                for (ClusterNode node : aff.nodes(partId, initTopVer)) {
-                    Map<Integer, Long> map = initUpdCntrs.get(node.id());
+                GridCacheAffinityManager aff = cctx.affinity();
 
-                    if (map != null) {
-                        partCntr = map.get(partId);
+                if (initUpdCntrsPerNode != null) {
+                    for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
 
-                        break;
+                        if (map != null) {
+                            partCntr = map.get(partId);
+
+                            break;
+                        }
                     }
                 }
+                else if (initUpdCntrs != null) {
+                    partCntr = initUpdCntrs.get(partId);
+                }
             }
 
             rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 2847063..869a51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -657,6 +657,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             log.warning("Failed to start continuous query.", e);
 
             cctx.kernalContext().continuous().stopRoutine(id);
+
+            throw new IgniteCheckedException("Failed to start continuous query.", e);
         }
 
         if (notifyExisting) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 2ab75d5..46e87af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
     public String cacheName();
 
     /**
+     * @param cntrsPerNode Init state partition counters for node.
      * @param cntrs Init state for partition counters.
      * @param topVer Topology version.
      */
-    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrs);
+    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/01d470f6/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 80ab092..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -219,17 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             // Update partition counters.
                             if (routine != null && routine.handler().isQuery()) {
-                                Map<UUID, Map<Integer, Long>> cnrtsPerNode =
msg.updateCountersPerNode();
+                                Map<UUID, Map<Integer, Long>> cntrsPerNode =
msg.updateCountersPerNode();
+                                Map<Integer, Long> cntrs = msg.updateCounters();
 
                                 GridCacheAdapter<Object, Object> interCache =
                                     ctx.cache().internalCache(routine.handler().cacheName());
 
                                 GridCacheContext cctx = interCache != null ? interCache.context()
: null;
 
-                                if (cctx != null && cnrtsPerNode != null &&
!cctx.isLocal() && cctx.affinityNode())
-                                    cnrtsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
+                                if (cctx != null && cntrsPerNode != null &&
!cctx.isLocal() && cctx.affinityNode())
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
 
-                                routine.handler().updateCounters(topVer, cnrtsPerNode);
+                                routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
 
                             fut.onRemoteRegistered();


Mime
View raw message