Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7729818810 for ; Mon, 21 Mar 2016 18:55:30 +0000 (UTC) Received: (qmail 8005 invoked by uid 500); 21 Mar 2016 18:55:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 7971 invoked by uid 500); 21 Mar 2016 18:55:30 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 7962 invoked by uid 99); 21 Mar 2016 18:55:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Mar 2016 18:55:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AB17DFAEC; Mon, 21 Mar 2016 18:55:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Message-Id: <7490ffdebd67436ab0602cf935f797e3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration." Date: Mon, 21 Mar 2016 18:55:30 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/ignite-2791-merg [created] 8e13dc265 Fixed IGNITE-2791 "Continuous query listener is not notified during concurrent key put and registration." Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e13dc26 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e13dc26 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e13dc26 Branch: refs/heads/ignite-2791-merg Commit: 8e13dc265abdb2718d1ea687caf3ad6270d95390 Parents: 43ff148 Author: nikolay_tikhonov Authored: Mon Mar 21 21:55:33 2016 +0300 Committer: nikolay_tikhonov Committed: Mon Mar 21 21:55:33 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 3 +- .../internal/GridMessageListenHandler.java | 3 +- .../continuous/CacheContinuousQueryHandler.java | 88 +++- .../continuous/CacheContinuousQueryManager.java | 12 + .../continuous/GridContinuousHandler.java | 4 +- .../continuous/GridContinuousProcessor.java | 27 +- .../StartRoutineAckDiscoveryMessage.java | 22 +- .../StartRoutineDiscoveryMessage.java | 22 +- .../CacheContinuousQueryLostPartitionTest.java | 2 - .../GridCacheContinuousQueryConcurrentTest.java | 466 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 11 files changed, 600 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index e2b1184..19bf1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -136,7 +136,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 402365c..0ac6877 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -125,7 +125,8 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 10fbd89..6243af7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -72,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; @@ -146,10 +148,13 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler private transient int cacheId; /** */ - private Map initUpdCntrs; + private transient volatile Map initUpdCntrs; /** */ - private AffinityTopologyVersion initTopVer; + private transient volatile Map> initUpdCntrsPerNode; + + /** */ + private transient volatile AffinityTopologyVersion initTopVer; /** */ private transient boolean ignoreClsNotFound; @@ -264,9 +269,11 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void updateCounters(AffinityTopologyVersion topVer, Map cntrs) { - this.initTopVer = topVer; + @Override public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs) { + this.initUpdCntrsPerNode = cntrsPerNode; this.initUpdCntrs = cntrs; + this.initTopVer = topVer; } /** {@inheritDoc} */ @@ -296,20 +303,6 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler assert !skipPrimaryCheck || loc; - final GridCacheContext cctx = cacheContext(ctx); - - if (!internal && cctx != null && initUpdCntrs != null) { - Map map = cctx.topology().updateCounters(); - - for (Map.Entry e : map.entrySet()) { - Long cntr0 = initUpdCntrs.get(e.getKey()); - Long cntr1 = e.getValue(); - - if (cntr0 == null || cntr1 > cntr0) - initUpdCntrs.put(e.getKey(), cntr1); - } - } - CacheContinuousQueryListener lsnr = new CacheContinuousQueryListener() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -561,6 +554,20 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler entry.prepareMarshal(cctx); } + /** + * Wait topology. + */ + public void waitTopologyFuture(GridKernalContext ctx) throws IgniteCheckedException { + GridCacheContext cctx = cacheContext(ctx); + + if (!cctx.isLocal()) { + cacheContext(ctx).affinity().affinityReadyFuture(initTopVer).get(); + + for (int partId = 0; partId < cacheContext(ctx).affinity().partitions(); partId++) + getOrCreatePartitionRecovery(ctx, partId); + } + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -668,19 +675,54 @@ public class CacheContinuousQueryHandler implements GridContinuousHandler if (e.updateCounter() == -1L) return F.asList(e); - PartitionRecovery rec = rcvs.get(e.partition()); + PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition()); + + return rec.collectEntries(e); + } + + /** + * @param ctx Context. + * @param partId Partition id. + * @return Partition recovery. + */ + @NotNull private PartitionRecovery getOrCreatePartitionRecovery(GridKernalContext ctx, int partId) { + PartitionRecovery rec = rcvs.get(partId); if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, - initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); + Long partCntr = null; - PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + AffinityTopologyVersion initTopVer0 = initTopVer; + + if (initTopVer0 != null) { + GridCacheContext cctx = cacheContext(ctx); + + GridCacheAffinityManager aff = cctx.affinity(); + + if (initUpdCntrsPerNode != null) { + for (ClusterNode node : aff.nodes(partId, initTopVer)) { + Map map = initUpdCntrsPerNode.get(node.id()); + + if (map != null) { + partCntr = map.get(partId); + + break; + } + } + } + else if (initUpdCntrs != null) { + partCntr = initUpdCntrs.get(partId); + } + } + + rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntr); + + PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec); if (oldRec != null) rec = oldRec; } - return rec.collectEntries(e); + return rec; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 353043f..869a51b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -649,6 +649,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { autoUnsubscribe, pred).get(); + try { + if (hnd.isQuery() && cctx.userCache()) + hnd.waitTopologyFuture(cctx.kernalContext()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to start continuous query.", e); + + cctx.kernalContext().continuous().stopRoutine(id); + + throw new IgniteCheckedException("Failed to start continuous query.", e); + } + if (notifyExisting) { final Iterator it = cctx.cache().allEntries().iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 8cd30a8..46e87af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -154,8 +154,10 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public String cacheName(); /** + * @param cntrsPerNode Init state partition counters for node. * @param cntrs Init state for partition counters. * @param topVer Topology version. */ - public void updateCounters(AffinityTopologyVersion topVer, Map cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map> cntrsPerNode, + Map cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 1776748..f2d6e1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -220,25 +219,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Update partition counters. if (routine != null && routine.handler().isQuery()) { + Map> cntrsPerNode = msg.updateCountersPerNode(); Map cntrs = msg.updateCounters(); GridCacheAdapter interCache = ctx.cache().internalCache(routine.handler().cacheName()); - if (interCache != null && cntrs != null && interCache.context() != null - && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) { - Map map = interCache.context().topology().updateCounters(); + GridCacheContext cctx = interCache != null ? interCache.context() : null; - for (Map.Entry e : map.entrySet()) { - Long cntr0 = cntrs.get(e.getKey()); - Long cntr1 = e.getValue(); + if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) + cntrsPerNode.put(ctx.localNodeId(), cctx.topology().updateCounters()); - if (cntr0 == null || cntr1 > cntr0) - cntrs.put(e.getKey(), cntr1); - } - } - - routine.handler().updateCounters(topVer, msg.updateCounters()); + routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); } fut.onRemoteRegistered(); @@ -756,7 +748,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { syncMsgFuts.put(futId, fut); try { - sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null); + sendNotification(nodeId, routineId, futId, F.asList(obj), null, msg, null); } catch (IgniteCheckedException e) { syncMsgFuts.remove(futId); @@ -923,11 +915,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (proc != null) { GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); - if (cache != null && !cache.isLocal()) { - Map cntrs = cache.context().topology().updateCounters(); - - req.addUpdateCounters(cntrs); - } + if (cache != null && !cache.isLocal()) + req.addUpdateCounters(ctx.localNodeId(), cache.context().topology().updateCounters()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index 9644372..ca34b27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -36,18 +37,28 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { private final Map errs; /** */ + @GridToStringExclude private final Map updateCntrs; + /** */ + @GridToStringExclude + private final Map> updateCntrsPerNode; + /** * @param routineId Routine id. * @param errs Errs. + * @param cntrs Partition counters. + * @param cntrsPerNode Partition counters per node. */ - public StartRoutineAckDiscoveryMessage(UUID routineId, Map errs, - Map cntrs) { + public StartRoutineAckDiscoveryMessage(UUID routineId, + Map errs, + Map cntrs, + Map> cntrsPerNode) { super(routineId); this.errs = new HashMap<>(errs); this.updateCntrs = cntrs; + this.updateCntrsPerNode = cntrsPerNode; } /** {@inheritDoc} */ @@ -63,6 +74,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { } /** + * @return Update counters for partitions per each node. + */ + public Map> updateCountersPerNode() { + return updateCntrsPerNode; + } + + /** * @return Errs. */ public Map errs() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index ff037d4..24eb050 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -40,6 +40,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private Map updateCntrs; + /** */ + private Map> updateCntrsPerNode; + /** Keep binary flag. */ private boolean keepBinary; @@ -72,7 +75,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** * @param cntrs Update counters. */ - public void addUpdateCounters(Map cntrs) { + private void addUpdateCounters(Map cntrs) { if (updateCntrs == null) updateCntrs = new HashMap<>(); @@ -86,6 +89,21 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { } /** + * @param nodeId Local node ID. + * @param cntrs Update counters. + */ + public void addUpdateCounters(UUID nodeId, Map cntrs) { + addUpdateCounters(cntrs); + + if (updateCntrsPerNode == null) + updateCntrsPerNode = new HashMap<>(); + + Map old = updateCntrsPerNode.put(nodeId, cntrs); + + assert old == null : old; + } + + /** * @return Errs. */ public Map errs() { @@ -106,7 +124,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs); + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java index f4659dc..025dd80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java @@ -140,8 +140,6 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes // node2 now becomes the primary for the key. stopGrid(0); - awaitPartitionMapExchange(); - cache2.put(key, "2"); // Sanity check. http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java new file mode 100644 index 0000000..29b351b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.FactoryBuilder.SingletonFactory; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.MINUTES; +import static javax.cache.configuration.FactoryBuilder.factoryOf; + +/** + * + */ +public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 2; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setPeerClassLoadingEnabled(false); + + if (gridName.endsWith(String.valueOf(NODES))) + cfg.setClientMode(ThreadLocalRandom.current().nextBoolean()); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedTx() throws Exception { + testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL, 1)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartReplicated() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartPartition() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRestartPartitionTx() throws Exception { + testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedAtomic() throws Exception { + testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionTx() throws Exception { + testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionAtomic() throws Exception { + testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2)); + } + + /** + * @throws Exception If failed. + */ + public void testRegistration(CacheConfiguration ccfg) throws Exception { + ExecutorService execSrv = newSingleThreadExecutor(); + + try { + final IgniteCache cache = grid(0).getOrCreateCache(ccfg); + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + final int i0 = i; + final AtomicBoolean stop = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final int conQryCnt = 50; + + Future>> fut = execSrv.submit( + new Callable>>() { + @Override public List> call() throws Exception { + int count = 0; + List> futures = new ArrayList<>(); + + while (!stop.get()) { + futures.add(waitForKey(i0, cache, count)); + + if (log.isDebugEnabled()) + log.debug("Started cont query count: " + count); + + if (++count >= conQryCnt) + latch.countDown(); + } + + return futures; + } + }); + + assert U.await(latch, 1, MINUTES); + + cache.put(i, "v"); + + stop.set(true); + + List> contQries = fut.get(); + + for (IgniteFuture contQry : contQries) + contQry.get(2, TimeUnit.SECONDS); + } + } + finally { + execSrv.shutdownNow(); + + grid(0).destroyCache(ccfg.getName()); + } + } + + /** + * @throws Exception If failed. + */ + public void testRestartRegistration(CacheConfiguration ccfg) throws Exception { + ExecutorService execSrv = newSingleThreadExecutor(); + + final AtomicBoolean stopRes = new AtomicBoolean(false); + + IgniteInternalFuture restartFut = null; + + try { + final IgniteCache cache = grid(0).getOrCreateCache(ccfg); + + restartFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stopRes.get()) { + startGrid(NODES); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODES + 1; + } + }, 5000L); + + Thread.sleep(300); + + stopGrid(NODES); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == NODES; + } + }, 5000L); + + Thread.sleep(300); + } + + return null; + } + }); + + U.sleep(100); + + for (int i = 0; i < 10; i++) { + log.info("Start iteration: " + i); + + final int i0 = i; + final AtomicBoolean stop = new AtomicBoolean(false); + final CountDownLatch latch = new CountDownLatch(1); + final int conQryCnt = 50; + + Future>> fut = execSrv.submit( + new Callable>>() { + @Override public List> call() throws Exception { + int count = 0; + List> futures = new ArrayList<>(); + + while (!stop.get()) { + futures.add(waitForKey(i0, cache, count)); + + if (log.isDebugEnabled()) + log.debug("Started cont query count: " + count); + + if (++count >= conQryCnt) + latch.countDown(); + } + + return futures; + } + }); + + latch.await(); + + cache.put(i, "v"); + + assertEquals("v", cache.get(i)); + + stop.set(true); + + List> contQries = fut.get(); + + for (IgniteFuture contQry : contQries) + contQry.get(5, TimeUnit.SECONDS); + } + } + finally { + execSrv.shutdownNow(); + + grid(0).destroyCache(ccfg.getName()); + + if (restartFut != null) { + stopRes.set(true); + + restartFut.get(); + + stopGrid(NODES); + } + } + } + + /** + * @param key Key + * @param cache Cache. + * @param id ID. + * @return Future. + */ + public IgniteFuture waitForKey(Integer key, final IgniteCache cache, final int id) { + String v = cache.get(key); + + // From now on, all futures will be completed immediately (since the key has been + // inserted). + if (v != null) + return new IgniteFinishedFutureImpl<>("immediately"); + + final IgniteFuture promise = new IgniteFutureImpl<>(new GridFutureAdapter()); + + final CacheEntryListenerConfiguration cfg = + createCacheListener(key, promise, id); + + promise.listen(new IgniteInClosure>() { + @Override public void apply(IgniteFuture future) { + GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + cache.deregisterCacheEntryListener(cfg); + + return null; + } + }); + } + }); + + // Start listening. + // Assumption: When the call returns, the listener is guaranteed to have been registered. + cache.registerCacheEntryListener(cfg); + + // Now must check the cache again, to make sure that we didn't miss the key insert while we + // were busy setting up the cache listener. + // Check asynchronously. + IgniteCache asyncCache = cache.withAsync(); + asyncCache.get(key); + + // Complete the promise if the key was inserted concurrently. + asyncCache.future().listen(new IgniteInClosure>() { + @Override public void apply(IgniteFuture f) { + String value = f.get(); + + if (value != null) { + log.info("Completed by get: " + id); + + (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get"); + } + } + }); + + return promise; + } + + /** + * @param key Key. + * @param result Result. + * @param id Listener ID. + * @return Listener + */ + private CacheEntryListenerConfiguration createCacheListener( + Integer key, + IgniteFuture result, + int id) { + return new MutableCacheEntryListenerConfiguration<>( + factoryOf(new CacheListener(result, id)), + new SingletonFactory<>(new KeyEventFilter(key, id)), false, true); + } + + + + /** + * @param cacheMode Cache mode. + * @param atomicMode Atomicy mode. + * @param backups Backups. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(CacheMode cacheMode, + CacheAtomicityMode atomicMode, int backups) { + CacheConfiguration cfg = new CacheConfiguration<>("test-" + cacheMode + atomicMode + backups); + + cfg.setCacheMode(cacheMode); + cfg.setAtomicityMode(atomicMode); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setBackups(backups); + cfg.setReadFromBackup(false); + + return cfg; + } + + /** + * + */ + private static class CacheListener implements CacheEntryCreatedListener, Serializable { + /** */ + final IgniteFuture result; + + /** */ + private final int id; + + /** + * @param result Result. + * @param id ID. + */ + CacheListener(IgniteFuture result, int id) { + this.result = result; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> evts) { + (((GridFutureAdapter)((IgniteFutureImpl)result).internalFuture())).onDone("by listener"); + } + } + + /** + * + */ + private static class KeyEventFilter implements CacheEntryEventFilter, Serializable { + /** */ + private static final long serialVersionUID = 42L; + + /** */ + private final Object key; + + /** */ + private final int id; + + /** + * @param key Key. + * @param id ID. + */ + KeyEventFilter(Object key, int id) { + this.key = key; + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent e) { + return e.getKey().equals(key); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + return this == o || !(o == null || getClass() != o.getClass()) + && key.equals(((KeyEventFilter) o).key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8e13dc26/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 083af1e..0aa3560 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryConcurrentTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalAtomicSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryLocalSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionAtomicOneNodeTest; @@ -228,6 +229,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class); suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class); + suite.addTestSuite(GridCacheContinuousQueryConcurrentTest.class); suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);