ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration."
Date Mon, 21 Mar 2016 20:49:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 43ff1488f -> 49725e9d3


Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and
registration."


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49725e9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49725e9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49725e9d

Branch: refs/heads/master
Commit: 49725e9d37e8f0117758cda4714ca6e9a583b900
Parents: 43ff148
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Mon Mar 21 23:44:56 2016 +0300
Committer: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Committed: Mon Mar 21 23:44:56 2016 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       |   3 +-
 .../internal/GridMessageListenHandler.java      |   3 +-
 .../continuous/CacheContinuousQueryHandler.java |  88 +++-
 .../continuous/CacheContinuousQueryManager.java |  12 +
 .../continuous/GridContinuousHandler.java       |   4 +-
 .../continuous/GridContinuousProcessor.java     |  27 +-
 .../StartRoutineAckDiscoveryMessage.java        |  22 +-
 .../StartRoutineDiscoveryMessage.java           |  22 +-
 .../CacheContinuousQueryLostPartitionTest.java  |   2 -
 .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 11 files changed, 600 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index e2b1184..19bf1a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer,
Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 402365c..0ac6877 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer,
Long> cntrs) {
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 10fbd89..6243af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
@@ -146,10 +148,13 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     private transient int cacheId;
 
     /** */
-    private Map<Integer, Long> initUpdCntrs;
+    private transient volatile Map<Integer, Long> initUpdCntrs;
 
     /** */
-    private AffinityTopologyVersion initTopVer;
+    private transient volatile Map<UUID, Map<Integer, Long>> initUpdCntrsPerNode;
+
+    /** */
+    private transient volatile AffinityTopologyVersion initTopVer;
 
     /** */
     private transient boolean ignoreClsNotFound;
@@ -264,9 +269,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
     }
 
     /** {@inheritDoc} */
-    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer,
Long> cntrs) {
-        this.initTopVer = topVer;
+    @Override public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs) {
+        this.initUpdCntrsPerNode = cntrsPerNode;
         this.initUpdCntrs = cntrs;
+        this.initTopVer = topVer;
     }
 
     /** {@inheritDoc} */
@@ -296,20 +303,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         assert !skipPrimaryCheck || loc;
 
-        final GridCacheContext<K, V> cctx = cacheContext(ctx);
-
-        if (!internal && cctx != null && initUpdCntrs != null) {
-            Map<Integer, Long> map = cctx.topology().updateCounters();
-
-            for (Map.Entry<Integer, Long> e : map.entrySet()) {
-                Long cntr0 = initUpdCntrs.get(e.getKey());
-                Long cntr1 = e.getValue();
-
-                if (cntr0 == null || cntr1 > cntr0)
-                    initUpdCntrs.put(e.getKey(), cntr1);
-            }
-        }
-
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K,
V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -561,6 +554,20 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             entry.prepareMarshal(cctx);
     }
 
+    /**
+     * Wait topology.
+     */
+    public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException {
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        if (!cctx.isLocal()) {
+            cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get();
+
+            for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++)
+                getOrCreatePartitionRecovery(ctx, partId);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
         // No-op.
@@ -668,19 +675,54 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         if (e.updateCounter() == -1L)
             return F.asList(e);
 
-        PartitionRecovery rec = rcvs.get(e.partition());
+        PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
+
+        return rec.collectEntries(e);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param partId Partition id.
+     * @return Partition recovery.
+     */
+    @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx,
int partId) {
+        PartitionRecovery rec = rcvs.get(partId);
 
         if (rec == null) {
-            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer,
-                initUpdCntrs == null ? null : initUpdCntrs.get(e.partition()));
+            Long partCntr = null;
 
-            PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+            AffinityTopologyVersion initTopVer0 = initTopVer;
+
+            if (initTopVer0 != null) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                GridCacheAffinityManager aff = cctx.affinity();
+
+                if (initUpdCntrsPerNode != null) {
+                    for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
+
+                        if (map != null) {
+                            partCntr = map.get(partId);
+
+                            break;
+                        }
+                    }
+                }
+                else if (initUpdCntrs != null) {
+                    partCntr = initUpdCntrs.get(partId);
+                }
+            }
+
+            rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr);
+
+            PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
 
             if (oldRec != null)
                 rec = oldRec;
         }
 
-        return rec.collectEntries(e);
+        return rec;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 353043f..869a51b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -649,6 +649,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             autoUnsubscribe,
             pred).get();
 
+        try {
+            if (hnd.isQuery() && cctx.userCache())
+                hnd.waitTopologyFuture(cctx.kernalContext());
+        }
+        catch (IgniteCheckedException e) {
+            log.warning("Failed to start continuous query.", e);
+
+            cctx.kernalContext().continuous().stopRoutine(id);
+
+            throw new IgniteCheckedException("Failed to start continuous query.", e);
+        }
+
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 8cd30a8..46e87af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
     public String cacheName();
 
     /**
+     * @param cntrsPerNode Init state partition counters for node.
      * @param cntrs Init state for partition counters.
      * @param topVer Topology version.
      */
-    public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs);
+    public void updateCounters(AffinityTopologyVersion topVer, Map<UUID, Map<Integer,
Long>> cntrsPerNode,
+        Map<Integer, Long> cntrs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 1776748..f2d6e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -220,25 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                             // Update partition counters.
                             if (routine != null && routine.handler().isQuery()) {
+                                Map<UUID, Map<Integer, Long>> cntrsPerNode =
msg.updateCountersPerNode();
                                 Map<Integer, Long> cntrs = msg.updateCounters();
 
                                 GridCacheAdapter<Object, Object> interCache =
                                     ctx.cache().internalCache(routine.handler().cacheName());
 
-                                if (interCache != null && cntrs != null &&
interCache.context() != null
-                                    && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode()))
{
-                                    Map<Integer, Long> map = interCache.context().topology().updateCounters();
+                                GridCacheContext cctx = interCache != null ? interCache.context()
: null;
 
-                                    for (Map.Entry<Integer, Long> e : map.entrySet())
{
-                                        Long cntr0 = cntrs.get(e.getKey());
-                                        Long cntr1 = e.getValue();
+                                if (cctx != null && cntrsPerNode != null &&
!cctx.isLocal() && cctx.affinityNode())
+                                    cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters());
 
-                                        if (cntr0 == null || cntr1 > cntr0)
-                                            cntrs.put(e.getKey(), cntr1);
-                                    }
-                                }
-
-                                routine.handler().updateCounters(topVer, msg.updateCounters());
+                                routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                             }
 
                             fut.onRemoteRegistered();
@@ -756,7 +748,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 syncMsgFuts.put(futId, fut);
 
                 try {
-                    sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic,
msg, null);
+                    sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg,
null);
                 }
                 catch (IgniteCheckedException e) {
                     syncMsgFuts.remove(futId);
@@ -923,11 +915,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (proc != null) {
                 GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
-                if (cache != null && !cache.isLocal()) {
-                    Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
-
-                    req.addUpdateCounters(cntrs);
-                }
+                if (cache != null && !cache.isLocal())
+                    req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index 9644372..ca34b27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage
{
     private final Map<UUID, IgniteCheckedException> errs;
 
     /** */
+    @GridToStringExclude
     private final Map<Integer, Long> updateCntrs;
 
+    /** */
+    @GridToStringExclude
+    private final Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /**
      * @param routineId Routine id.
      * @param errs Errs.
+     * @param cntrs Partition counters.
+     * @param cntrsPerNode Partition counters per node.
      */
-    public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException>
errs,
-        Map<Integer, Long> cntrs) {
+    public StartRoutineAckDiscoveryMessage(UUID routineId,
+        Map<UUID, IgniteCheckedException> errs,
+        Map<Integer, Long> cntrs,
+        Map<UUID, Map<Integer, Long>> cntrsPerNode) {
         super(routineId);
 
         this.errs = new HashMap<>(errs);
         this.updateCntrs = cntrs;
+        this.updateCntrsPerNode = cntrsPerNode;
     }
 
     /** {@inheritDoc} */
@@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage
{
     }
 
     /**
+     * @return Update counters for partitions per each node.
+     */
+    public Map<UUID, Map<Integer, Long>> updateCountersPerNode() {
+        return updateCntrsPerNode;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index ff037d4..24eb050 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
     /** */
     private Map<Integer, Long> updateCntrs;
 
+    /** */
+    private Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
+
     /** Keep binary flag. */
     private boolean keepBinary;
 
@@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
     /**
      * @param cntrs Update counters.
      */
-    public void addUpdateCounters(Map<Integer, Long> cntrs) {
+    private void addUpdateCounters(Map<Integer, Long> cntrs) {
         if (updateCntrs == null)
             updateCntrs = new HashMap<>();
 
@@ -86,6 +89,21 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
     }
 
     /**
+     * @param nodeId Local node ID.
+     * @param cntrs Update counters.
+     */
+    public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
+        addUpdateCounters(cntrs);
+
+        if (updateCntrsPerNode == null)
+            updateCntrsPerNode = new HashMap<>();
+
+        Map<Integer, Long> old = updateCntrsPerNode.put(nodeId, cntrs);
+
+        assert old == null : old;
+    }
+
+    /**
      * @return Errs.
      */
     public Map<UUID, IgniteCheckedException> errs() {
@@ -106,7 +124,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
 
     /** {@inheritDoc} */
     @Override public DiscoveryCustomMessage ackMessage() {
-        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
+        return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
index f4659dc..025dd80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java
@@ -140,8 +140,6 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes
         // node2 now becomes the primary for the key.
         stopGrid(0);
 
-        awaitPartitionMapExchange();
-
         cache2.put(key, "2");
 
         // Sanity check.

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
new file mode 100644
index 0000000..29b351b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.FactoryBuilder.SingletonFactory;
+import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
+import javax.cache.event.CacheEntryCreatedListener;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.cache.configuration.FactoryBuilder.factoryOf;
+
+/**
+ *
+ */
+public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        if (gridName.endsWith(String.valueOf(NODES)))
+            cfg.setClientMode(ThreadLocalRandom.current().nextBoolean());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL,
1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartReplicated() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartition() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartPartitionTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionAtomic() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC,
2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws
Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                assert U.await(latch, 1, MINUTES);
+
+                cache.put(i, "v");
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(2, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartRegistration(CacheConfiguration ccfg) throws Exception {
+        ExecutorService execSrv = newSingleThreadExecutor();
+
+        final AtomicBoolean stopRes = new AtomicBoolean(false);
+
+        IgniteInternalFuture<?> restartFut = null;
+
+        try {
+            final IgniteCache<Integer, String> cache = grid(0).getOrCreateCache(ccfg);
+
+            restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stopRes.get()) {
+                        startGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES + 1;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+
+                        stopGrid(NODES);
+
+                        assert GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return grid(0).cluster().nodes().size() == NODES;
+                            }
+                        }, 5000L);
+
+                        Thread.sleep(300);
+                    }
+
+                    return null;
+                }
+            });
+
+            U.sleep(100);
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Start iteration: " + i);
+
+                final int i0 = i;
+                final AtomicBoolean stop = new AtomicBoolean(false);
+                final CountDownLatch latch = new CountDownLatch(1);
+                final int conQryCnt = 50;
+
+                Future<List<IgniteFuture<String>>> fut = execSrv.submit(
+                    new Callable<List<IgniteFuture<String>>>() {
+                        @Override public List<IgniteFuture<String>> call() throws
Exception {
+                            int count = 0;
+                            List<IgniteFuture<String>> futures = new ArrayList<>();
+
+                            while (!stop.get()) {
+                                futures.add(waitForKey(i0, cache, count));
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Started cont query count: " + count);
+
+                                if (++count >= conQryCnt)
+                                    latch.countDown();
+                            }
+
+                            return futures;
+                        }
+                    });
+
+                latch.await();
+
+                cache.put(i, "v");
+
+                assertEquals("v", cache.get(i));
+
+                stop.set(true);
+
+                List<IgniteFuture<String>> contQries = fut.get();
+
+                for (IgniteFuture<String> contQry : contQries)
+                    contQry.get(5, TimeUnit.SECONDS);
+            }
+        }
+        finally {
+            execSrv.shutdownNow();
+
+            grid(0).destroyCache(ccfg.getName());
+
+            if (restartFut != null) {
+                stopRes.set(true);
+
+                restartFut.get();
+
+                stopGrid(NODES);
+            }
+        }
+    }
+
+    /**
+     * @param key Key
+     * @param cache Cache.
+     * @param id ID.
+     * @return Future.
+     */
+    public IgniteFuture<String> waitForKey(Integer key, final IgniteCache<Integer,
String> cache, final int id) {
+        String v = cache.get(key);
+
+        // From now on, all futures will be completed immediately (since the key has been
+        // inserted).
+        if (v != null)
+            return new IgniteFinishedFutureImpl<>("immediately");
+
+        final IgniteFuture<String> promise = new IgniteFutureImpl<>(new GridFutureAdapter<String>());
+
+        final CacheEntryListenerConfiguration<Integer, String> cfg =
+            createCacheListener(key, promise, id);
+
+        promise.listen(new IgniteInClosure<IgniteFuture<String>>() {
+            @Override public void apply(IgniteFuture<String> future) {
+                GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        cache.deregisterCacheEntryListener(cfg);
+
+                        return null;
+                    }
+                });
+            }
+        });
+
+        // Start listening.
+        // Assumption: When the call returns, the listener is guaranteed to have been registered.
+        cache.registerCacheEntryListener(cfg);
+
+        // Now must check the cache again, to make sure that we didn't miss the key insert
while we
+        // were busy setting up the cache listener.
+        // Check asynchronously.
+        IgniteCache<Integer, String> asyncCache = cache.withAsync();
+        asyncCache.get(key);
+
+        // Complete the promise if the key was inserted concurrently.
+        asyncCache.<String>future().listen(new IgniteInClosure<IgniteFuture<String>>()
{
+            @Override public void apply(IgniteFuture<String> f) {
+                String value = f.get();
+
+                if (value != null) {
+                    log.info("Completed by get: " + id);
+
+                    (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by
get");
+                }
+            }
+        });
+
+        return promise;
+    }
+
+    /**
+     * @param key Key.
+     * @param result Result.
+     * @param id Listener ID.
+     * @return Listener
+     */
+    private CacheEntryListenerConfiguration<Integer, String> createCacheListener(
+        Integer key,
+        IgniteFuture<String> result,
+        int id) {
+        return new MutableCacheEntryListenerConfiguration<>(
+            factoryOf(new CacheListener(result, id)),
+            new SingletonFactory<>(new KeyEventFilter(key, id)), false, true);
+    }
+
+
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicMode Atomicy mode.
+     * @param backups Backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, String> cacheConfiguration(CacheMode cacheMode,
+        CacheAtomicityMode atomicMode, int backups) {
+        CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>("test-"
+ cacheMode + atomicMode + backups);
+
+        cfg.setCacheMode(cacheMode);
+        cfg.setAtomicityMode(atomicMode);
+        cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        cfg.setBackups(backups);
+        cfg.setReadFromBackup(false);
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    private static class CacheListener implements CacheEntryCreatedListener<Integer, String>,
Serializable {
+        /** */
+        final IgniteFuture<String> result;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param result Result.
+         * @param id ID.
+         */
+        CacheListener(IgniteFuture<String> result, int id) {
+            this.result = result;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer,
? extends String>> evts) {
+            (((GridFutureAdapter)((IgniteFutureImpl)result).internalFuture())).onDone("by
listener");
+        }
+    }
+
+    /**
+     *
+     */
+    private static class KeyEventFilter implements CacheEntryEventFilter<Integer, String>,
Serializable {
+        /** */
+        private static final long serialVersionUID = 42L;
+
+        /** */
+        private final Object key;
+
+        /** */
+        private final int id;
+
+        /**
+         * @param key Key.
+         * @param id ID.
+         */
+        KeyEventFilter(Object key, int id) {
+            this.key = key;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends
String> e) {
+            return e.getKey().equals(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass())
+                && key.equals(((KeyEventFilter) o).key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/49725e9d/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 083af1e..0aa3560 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest;
@@ -228,6 +229,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class);
         suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
         suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class);
         suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);


Mime
View raw message