Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 104E6184D0 for ; Fri, 23 Oct 2015 11:52:04 +0000 (UTC) Received: (qmail 22117 invoked by uid 500); 23 Oct 2015 11:52:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 22041 invoked by uid 500); 23 Oct 2015 11:52:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 21657 invoked by uid 99); 23 Oct 2015 11:52:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Oct 2015 11:52:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82C2CE0A23; Fri, 23 Oct 2015 11:52:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ntikhonov@apache.org To: commits@ignite.apache.org Date: Fri, 23 Oct 2015 11:52:14 -0000 Message-Id: <213f8bfce28a40d98de0a8a3e985e09a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [12/19] ignite git commit: IGNITE-426 WIP test and semen reviewed fixes IGNITE-426 WIP test and semen reviewed fixes Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9c611143 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9c611143 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9c611143 Branch: refs/heads/ignite-426-2-reb Commit: 9c611143b80f11b61ddb36553c8f41b8e061cad0 Parents: b1cfeac Author: nikolay_tikhonov Authored: Thu Oct 8 15:27:28 2015 +0300 Committer: nikolay_tikhonov Committed: Fri Oct 23 14:50:09 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 5 - .../internal/GridMessageListenHandler.java | 5 - .../communication/GridIoMessageFactory.java | 6 - .../processors/cache/GridCacheMapEntry.java | 4 +- .../distributed/dht/GridDhtLocalPartition.java | 30 +- .../dht/GridDhtPartitionTopologyImpl.java | 6 +- .../continuous/CacheContinuousQueryEntry.java | 70 +- .../continuous/CacheContinuousQueryHandler.java | 263 ++--- .../CacheContinuousQueryListener.java | 15 - .../CacheContinuousQueryLostPartition.java | 148 --- .../continuous/CacheContinuousQueryManager.java | 33 +- .../continuous/GridContinuousHandler.java | 6 - ...acheContinuousQueryFailoverAbstractTest.java | 965 ++++++++++++++++--- 13 files changed, 985 insertions(+), 571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 1b9c46c..1eeedeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -386,11 +386,6 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void partitionLost(String cacheName, int partId) { - // No-op. - } - - /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index e038794..bddebba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -179,11 +179,6 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void partitionLost(String cacheName, int partId) { - // No-op. - } - - /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3474f84..6f71d57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -92,7 +92,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder; @@ -691,11 +690,6 @@ public class GridIoMessageFactory implements MessageFactory { break; - case 115: - msg = new CacheContinuousQueryLostPartition(); - - break; - // [-3..115] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 259869e..570172d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3156,14 +3156,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme private long nextPartIndex(AffinityTopologyVersion topVer) { long updateIdx; - //U.dumpStack(); - if (!cctx.isLocal() && !isNear()) { GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false); assert locPart != null; - updateIdx = locPart.nextContinuousQueryUpdateIndex(); + updateIdx = locPart.nextUpdateIndex(); } else updateIdx = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 975d76c..baa8520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -58,18 +58,6 @@ import org.jetbrains.annotations.NotNull; import org.jsr166.ConcurrentHashMap8; import org.jsr166.LongAdder8; -import javax.cache.CacheException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicStampedReference; -import java.util.concurrent.locks.ReentrantLock; - import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; @@ -128,8 +116,8 @@ public class GridDhtLocalPartition implements Comparable, /** Group reservations. */ private final CopyOnWriteArrayList reservations = new CopyOnWriteArrayList<>(); - /** Continuous query update index. */ - private final AtomicLong contQryUpdIdx = new AtomicLong(); + /** Update index. */ + private final AtomicLong updIdx = new AtomicLong(); /** * @param cctx Context. @@ -628,28 +616,28 @@ public class GridDhtLocalPartition implements Comparable, /** * @return Next update index. */ - public long nextContinuousQueryUpdateIndex() { - return contQryUpdIdx.incrementAndGet(); + public long nextUpdateIndex() { + return updIdx.incrementAndGet(); } /** * @return Current update index. */ - public long continuousQueryUpdateIndex() { - return contQryUpdIdx.get(); + public long updateIndex() { + return updIdx.get(); } /** * @param val Update index value. */ - public void continuousQueryUpdateIndex(long val) { + public void updateIndex(long val) { while (true) { - long val0 = contQryUpdIdx.get(); + long val0 = updIdx.get(); if (val0 >= val) break; - if (contQryUpdIdx.compareAndSet(val0, val)) + if (updIdx.compareAndSet(val0, val)) break; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 5d312b6..098a60d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -939,7 +939,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { Long cntr = cntrMap.get(part.id()); if (cntr != null) - part.continuousQueryUpdateIndex(cntr); + part.updateIndex(cntr); } } @@ -1053,7 +1053,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { Long cntr = cntrMap.get(part.id()); if (cntr != null) - part.continuousQueryUpdateIndex(cntr); + part.updateIndex(cntr); } } @@ -1317,7 +1317,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { for (GridDhtLocalPartition part : locParts.values()) { Long cntr0 = res.get(part.id()); - Long cntr1 = part.continuousQueryUpdateIndex(); + Long cntr1 = part.updateIndex(); if (cntr0 == null || cntr1 > cntr0) res.put(part.id(), cntr1); http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 9e73142..eefbbae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -43,6 +43,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final long serialVersionUID = 0L; /** */ + private static final byte BACKUP_ENTRY = 0b0001; + + /** */ + private static final byte ORDERED_ENTRY = 0b0010; + + /** */ + private static final byte FILTERED_ENTRY = 0b0100; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -82,8 +91,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** Update index. */ private long updateIdx; - /** */ - private boolean filtered; + /** Flags. */ + private byte flags; /** */ @GridToStringInclude @@ -91,7 +100,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private AffinityTopologyVersion topVer; /** - * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + * Required by {@link Message}. */ public CacheContinuousQueryEntry() { // No-op. @@ -134,13 +143,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** - * @return Cache ID. - */ - int cacheId() { - return cacheId; - } - - /** * @return Event type. */ EventType eventType() { @@ -155,24 +157,52 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** - * Mark this event as filtered. + * @return Update index. + */ + long updateIndex() { + return updateIdx; + } + + /** + * Mark that entry create on backup. + */ + void markBackup() { + flags |= BACKUP_ENTRY; + } + + /** + * Mark that entry ordered. + */ + void markOrdered() { + flags |= ORDERED_ENTRY; + } + + /** + * Mark that entry filtered. */ void markFiltered() { - filtered = true; + flags |= FILTERED_ENTRY; } /** - * @return Update index. + * @return {@code True} if entry sent by backup node. */ - long updateIndex() { - return updateIdx; + boolean isBackup() { + return (flags & BACKUP_ENTRY) != 0; + } + + /** + * @return {@code True} . + */ + boolean isOrdered() { + return (flags & ORDERED_ENTRY) != 0; } /** - * @return Filtered entry. + * @return {@code True} if entry was filtered. */ boolean filtered() { - return filtered; + return (flags & FILTERED_ENTRY) != 0; } /** @@ -297,7 +327,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 7: - if (!writer.writeBoolean("filtered", filtered)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); @@ -376,7 +406,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 7: - filtered = reader.readBoolean("filtered"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -390,7 +420,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index bb2558c..4734998 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -63,6 +64,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -84,9 +86,6 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { /** */ private static final int BACKUP_ACK_THRESHOLD = 100; - /** */ - private static final int QUERY_HOLE_THRESHOLD = 5; - /** Cache name. */ private String cacheName; @@ -291,20 +290,7 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { if (primary || skipPrimaryCheck) { if (loc) { if (!localCache) { - PartitionRecovery rcv = rcvs.get(entry.partition()); - - if (rcv == null) { - rcv = new PartitionRecovery(ctx.log(getClass())); - - PartitionRecovery oldRec = rcvs.putIfAbsent(entry.partition(), rcv); - - if (oldRec != null) - rcv = oldRec; - } - - rcv.add(entry); - - Collection entries = rcv.entries(); + Collection entries = handleEntry(ctx, entry); if (!entries.isEmpty()) { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); @@ -342,8 +328,11 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } } - else + else { + entry.markBackup(); + backupQueue.add(entry); + } } catch (ClusterTopologyCheckedException ex) { IgniteLogger log = ctx.log(getClass()); @@ -378,54 +367,6 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { } } - @Override public void partitionLost(int partId) { - assert rcvs != null; - - PartitionRecovery rcv = rcvs.get(partId); - - if (rcv != null) - rcv.reset(); - } - - @Override public void firePartitionLostEvent(String cacheName0, final int partId) { - GridCacheContext cctx = cacheContext(ctx); - - // Check that cache stopped. - if (cctx == null) - return; - - if ((cacheName == null && cacheName0 == null) || // Check default cache. - (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) { - ctx.closure().runLocalSafe(new Runnable() { - @Override public void run() { - GridCacheContext cctx = cacheContext(ctx); - - CacheContinuousQueryLostPartition msg = new CacheContinuousQueryLostPartition( - routineId, - cctx.cacheId(), - partId); - - try { - cctx.io().send(nodeId, msg, GridIoPolicy.SYSTEM_POOL); - } - catch (ClusterTopologyCheckedException e) { - IgniteLogger log = ctx.log(getClass()); - - if (log.isDebugEnabled()) - log.debug("Failed to send lost partition message, node left " + - "[msg=" + msg + ", nodeId=" + routineId + ']'); - } - catch (IgniteCheckedException e) { - IgniteLogger log = ctx.log(getClass()); - - U.error(log, "Failed to send lost partition message " + - "[msg=" + msg + ", nodeId=" + routineId + ']', e); - } - } - }); - } - } - @Override public void onUnregister() { if (rmtFilter instanceof PlatformContinuousQueryFilter) ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister(); @@ -574,31 +515,10 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - Map parts = new HashMap<>(); - - for (CacheContinuousQueryEntry e : entries) { - PartitionRecovery rec = parts.containsKey(e.partition()) ? - parts.get(e.partition()) : rcvs.get(e.partition()); - - if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass())); - - PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); - - if (oldRec != null) - rec = oldRec; - } - - rec.add(e); - - if (!parts.containsKey(e.partition())) - parts.put(e.partition(), rec); - } - Collection entries0 = new ArrayList<>(); - for (PartitionRecovery rec : parts.values()) - entries0.addAll(rec.entries()); + for (CacheContinuousQueryEntry e : entries) + entries0.addAll(handleEntry(ctx, e)); Iterable> evts = F.viewReadOnly(entries0, new C1>() { @@ -617,6 +537,39 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { } /** + * @param ctx Context. + * @param e entry. + * @return Entry collection. + */ + private Collection handleEntry(GridKernalContext ctx, CacheContinuousQueryEntry e) { + assert e != null; + + // Initial query entry or evicted entry. + // This events should be fired immediately. + if (e.updateIndex() == -1) + return F.asList(e); + + PartitionRecovery rec = rcvs.get(e.partition()); + + if (rec == null) { + rec = new PartitionRecovery(ctx.log(getClass())); + + PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + + if (oldRec != null) + rec = oldRec; + } + + Collection entries = rec.collectEntries(e); + + if (CacheContinuousQueryManager.SUPER_DEBUG) + ctx.log(getClass()).error("Fire the following event for partition : " + e.partition() + + " Entries: " + Arrays.toString(entries.toArray())); + + return entries; + } + + /** * */ private static class PartitionRecovery { @@ -624,11 +577,16 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { private IgniteLogger log; /** */ - private long lastFiredEvt = 0; + private static final long INIT_VALUE = -100; + + /** */ + private long lastFiredEvt = INIT_VALUE; /** */ private final Map pendingEnts = new TreeMap<>(); + private List> firedEvents = new ArrayList<>(); + /** * @param log Logger. */ @@ -639,99 +597,83 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { /** * Add continuous entry. * - * @param e Cache continuous qeury entry. + * @param entry Cache continuous query entry. + * @return Collection entries which will be fired. */ - public void add(CacheContinuousQueryEntry e) { - assert e != null; + public Collection collectEntries(CacheContinuousQueryEntry entry) { + assert entry != null; - synchronized (pendingEnts) { - if (!pendingEnts.containsKey(e.updateIndex()) && e.updateIndex() > lastFiredEvt) - pendingEnts.put(e.updateIndex(), e); - else if (log.isDebugEnabled()) - log.debug("Skip duplicate continuous query message: " + e); - } - } - - /** - * @return Ordered continuous query entries. - */ - public Collection entries() { - List entries = new ArrayList<>(); + List entries; synchronized (pendingEnts) { - if (pendingEnts.isEmpty()) - return Collections.emptyList(); + // Received first event. + if (lastFiredEvt == INIT_VALUE) { + if (CacheContinuousQueryManager.SUPER_DEBUG) + log.error("First event. " + entry); - Iterator> iter = pendingEnts.entrySet().iterator(); + lastFiredEvt = entry.updateIndex(); - boolean fired = false; + firedEvents.add(new T2<>(lastFiredEvt, entry)); - // The elements are consistently. - while (iter.hasNext()) { - Map.Entry e = iter.next(); - - if (e.getKey() == lastFiredEvt + 1) { - ++lastFiredEvt; - - entries.add(e.getValue()); - - iter.remove(); - - fired = true; - } + return F.asList(entry); } - if (!fired && lastFiredEvt == 0 && pendingEnts.size() >= QUERY_HOLE_THRESHOLD) { - Long prevCnt = null; + // Handle case when nodes owning partition left from topology. + if (entry.updateIndex() == 1 && !entry.isBackup()) { + pendingEnts.clear(); - int orderedCnt = 0; + lastFiredEvt = 1; - for (Long cnt : pendingEnts.keySet()) { - if (prevCnt != null) { - if (prevCnt + 1 != cnt) - break; - else - ++orderedCnt; - } + if (CacheContinuousQueryManager.SUPER_DEBUG) + log.error("Lost partition. Start from 1. Entry: " + entry); - prevCnt = cnt; - } + firedEvents.add(new T2<>(lastFiredEvt, entry)); - if (orderedCnt >= QUERY_HOLE_THRESHOLD) { - iter = pendingEnts.entrySet().iterator(); + return F.asList(entry); + } - while (entries.size() < orderedCnt) { - Map.Entry e = iter.next(); + // Check duplicate. + if (entry.updateIndex() > lastFiredEvt) { + if (CacheContinuousQueryManager.SUPER_DEBUG) + log.error("Put message to pending queue. Counter value: " + lastFiredEvt + " Entry: " + entry); - entries.add(e.getValue()); + pendingEnts.put(entry.updateIndex(), entry); + } + else { + if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + entry); - lastFiredEvt = e.getKey(); + if (CacheContinuousQueryManager.SUPER_DEBUG) + log.error("Received duplicate. Counter value: " + lastFiredEvt + " Entry: " + entry + + ", Proceed message " + Arrays.toString(firedEvents.toArray())); - iter.remove(); - } - } + return Collections.emptyList(); } - } - return entries; - } + if (pendingEnts.isEmpty()) + return Collections.emptyList(); - /** - * Reset internal state. - */ - public void reset() { - synchronized (pendingEnts) { Iterator> iter = pendingEnts.entrySet().iterator(); + entries = new ArrayList<>(); + + // Elements are consistently. while (iter.hasNext()) { Map.Entry e = iter.next(); - if (e.getKey() >= lastFiredEvt) + if (e.getKey() == lastFiredEvt + 1) { + ++lastFiredEvt; + + entries.add(e.getValue()); + + firedEvents.add(new T2<>(e.getKey(), e.getValue())); + iter.remove(); + } } - - lastFiredEvt = 0; } + + return entries; } } @@ -766,17 +708,6 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); } - /** {@inheritDoc} */ - @Override public void partitionLost(String cacheName, int partId) { - if ((this.cacheName == null && cacheName == null) // Check default caches. - || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) { - PartitionRecovery rcv = rcvs.get(partId); - - if (rcv != null) - rcv.reset(); - } - } - /** * @param t Acknowledge information. * @param routineId Routine ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index a706105..2f9e111 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -55,21 +55,6 @@ interface CacheContinuousQueryListener { public void cleanupBackupQueue(Map updateIdxs); /** - * Fire event that partition lost. - * - * @param cacheName Cache name. - * @param partId Partition ID. - */ - public void firePartitionLostEvent(String cacheName, int partId); - - /** - * Handle partition lost event. - * - * @param partId Partition ID. - */ - public void partitionLost(int partId); - - /** * Flushes backup queue. * * @param ctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java deleted file mode 100644 index eeb20cc..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.nio.ByteBuffer; -import java.util.UUID; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Continuous query entry. - */ -public class CacheContinuousQueryLostPartition extends GridCacheMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Routine ID. */ - private UUID routineId; - - /** Partition. */ - private int part; - - /** - * Required by {@link Message}. - */ - public CacheContinuousQueryLostPartition() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param part Partition ID. - */ - CacheContinuousQueryLostPartition(UUID routineId, int cacheId, int part) { - this.routineId = routineId; - this.cacheId = cacheId; - this.part = part; - } - - /** - * @return Partition. - */ - int partition() { - return part; - } - - /** - * @return Routine ID. - */ - UUID routineId() { - return routineId; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 115; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeInt("part", part)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeUuid("routineId", routineId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - part = reader.readInt("part"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - routineId = reader.readUuid("routineId"); - - if (!reader.isLastRead()) - return false; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 5; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheContinuousQueryLostPartition.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index d0d877d..7c04053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -46,9 +46,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheRebalancingEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -90,6 +87,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** */ private static final long BACKUP_ACK_FREQ = 5000; + public static final boolean SUPER_DEBUG = false; + /** Listeners. */ private final ConcurrentMap lsnrs = new ConcurrentHashMap8<>(); @@ -127,16 +126,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } }); - cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryLostPartition.class, - new CI2() { - @Override public void apply(UUID uuid, CacheContinuousQueryLostPartition msg) { - CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId()); - - if (lsnr != null) - lsnr.partitionLost(msg.partition()); - } - }); - cctx.time().schedule(new Runnable() { @Override public void run() { for (CacheContinuousQueryListener lsnr : lsnrs.values()) @@ -146,20 +135,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext()); } }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ); - - cctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof CacheRebalancingEvent; - - CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt; - - for (CacheContinuousQueryListener lsnr : lsnrs.values()) - lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition()); - - for (CacheContinuousQueryListener lsnr : intLsnrs.values()) - lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition()); - } - }, EVT_CACHE_REBALANCE_PART_DATA_LOST); } /** {@inheritDoc} */ @@ -319,7 +294,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { null, lsnr.oldValueRequired() ? oldVal : null, e.partition(), - 0, + -1, null); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( @@ -568,7 +543,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { e.rawGet(), null, 0, - 0, + -1, null); next = new CacheContinuousQueryEvent<>( http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 975cd2f..40fb12a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -114,12 +114,6 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx); /** - * @param cacheName Cache name. - * @param partId Partition ID. - */ - public void partitionLost(String cacheName, int partId); - - /** * @return Topic for ordered notifications. If {@code null}, notifications * will be sent in non-ordered messages. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9c611143/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 61fa6cd..ca754af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -31,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -39,6 +39,8 @@ import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; @@ -47,6 +49,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; @@ -71,10 +74,10 @@ import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; -import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -279,6 +282,17 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo IgniteCache clnCache = qryClient.cache(null); + IgniteOutClosure> rndCache = + new IgniteOutClosure>() { + int cnt = 0; + + @Override public IgniteCache apply() { + ++cnt; + + return grid(cnt % SRV_NODES + 1).cache(null); + } + }; + Ignite igniteSrv = ignite(0); IgniteCache srvCache = igniteSrv.cache(null); @@ -290,16 +304,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (int j = 0; j < 50; ++j) { ContinuousQuery qry = new ContinuousQuery<>(); - final TestLocalListener lsnr = new TestLocalListener(); + final CacheEventListener3 lsnr = new CacheEventListener3(); qry.setLocalListener(lsnr); + qry.setRemoteFilter(lsnr); + int keyIter = 0; for (; keyIter < keyCnt / 2; keyIter++) { int key = keys.get(keyIter); - clnCache.put(key, key); + rndCache.apply().put(key, key); } assert lsnr.evts.isEmpty(); @@ -312,28 +328,40 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Affinity aff = affinity(srvCache); + boolean filtered = false; + for (; keyIter < keys.size(); keyIter++) { int key = keys.get(keyIter); - log.info("Put [key=" + key + ", part=" + aff.partition(key) + ']'); + int val = filtered ? 1 : 2; + + log.info("Put [key=" + key + ", val=" + val + ", part=" + aff.partition(key) + ']'); T2 t = updates.get(key); if (t == null) { - updates.put(key, new T2<>((Object)key, null)); + // Check filtered. + if (!filtered) { + updates.put(key, new T2<>((Object)val, null)); - expEvts.add(new T3<>((Object)key, (Object)key, null)); + expEvts.add(new T3<>((Object)key, (Object)val, null)); + } } else { - updates.put(key, new T2<>((Object)key, (Object)key)); + // Check filtered. + if (!filtered) { + updates.put(key, new T2<>((Object)val, (Object)t.get1())); - expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); + expEvts.add(new T3<>((Object)key, (Object)val, (Object)t.get1())); + } } - srvCache.put(key, key); + rndCache.apply().put(key, val); + + filtered = !filtered; } - checkEvents(expEvts, lsnr); + checkEvents(expEvts, lsnr, false); query.close(); } @@ -357,7 +385,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo ContinuousQuery qry = new ContinuousQuery<>(); - final TestLocalListener lsnr = new TestLocalListener(); + final CacheEventListener3 lsnr = new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -421,7 +449,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo filtered = !filtered; } - checkEvents(expEvts, lsnr); + checkEvents(expEvts, lsnr, false); List stopThreads = new ArrayList<>(3); @@ -488,7 +516,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo filtered = !filtered; } - checkEvents(expEvts, lsnr); + checkEvents(expEvts, lsnr, false); query.close(); } @@ -518,7 +546,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo ContinuousQuery qry = new ContinuousQuery<>(); - final TestLocalListener lsnr = new TestLocalListener(); + final CacheEventListener3 lsnr = new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -599,43 +627,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']'); } - checkEvents(expEvts, lsnr); + checkEvents(expEvts, lsnr, false); } cur.close(); } /** - * - */ - public static class TestLocalListener implements CacheEntryUpdatedListener, - CacheEntryEventSerializableFilter { - /** Keys. */ - GridConcurrentHashSet keys = new GridConcurrentHashSet<>(); - - /** Events. */ - private final ConcurrentHashMap> evts = new ConcurrentHashMap<>(); - - /** {@inheritDoc} */ - @Override public void onUpdated(Iterable> events) throws CacheEntryListenerException { - for (CacheEntryEvent e : events) { - System.err.println("Update entry: " + e); - - Integer key = (Integer)e.getKey(); - - keys.add(key); - - evts.put(key, e); - } - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent e) throws CacheEntryListenerException { - return (Integer)e.getValue() % 2 == 0; - } - } - - /** * @throws Exception If failed. */ public void testThreeBackups() throws Exception { @@ -822,22 +820,145 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @param expEvts Expected events. * @param lsnr Listener. + * @param lostAllow If {@code true} than won't assert on lost events. */ - private void checkEvents(final List> expEvts, final TestLocalListener lsnr) - throws Exception { - assert GridTestUtils.waitForCondition(new PA() { + private void checkEvents(final List> expEvts, final CacheEventListener2 lsnr, + boolean lostAllow) throws Exception { + boolean b = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return lsnr.evts.size() == expEvts.size(); + return expEvts.size() == lsnr.size(); } }, 2000L); + List> lostEvents = new ArrayList<>(); + + for (T3 exp : expEvts) { + List> rcvdEvts = lsnr.evts.get(exp.get1()); + + if (rcvdEvts == null || rcvdEvts.isEmpty()) { + lostEvents.add(exp); + + continue; + } + + Iterator> iter = rcvdEvts.iterator(); + + boolean found = false; + + while (iter.hasNext()) { + CacheEntryEvent e = iter.next(); + + if ((exp.get2() != null && e.getValue() != null && exp.get2() == e.getValue()) + && equalOldValue(e, exp)) { + found = true; + + iter.remove(); + + break; + } + } + + // Lost event is acceptable. + if (!found) + lostEvents.add(exp); + } + + boolean dup = false; + + // Check duplicate. + if (!lsnr.evts.isEmpty()) { + for (List> evts : lsnr.evts.values()) { + if (!evts.isEmpty()) { + for (CacheEntryEvent e : evts) { + boolean found = false; + + for (T3 lostEvt : lostEvents) { + if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2()) + && equalOldValue(e, lostEvt)) { + found = true; + + lostEvents.remove(lostEvt); + + break; + } + } + + if (!found) { + dup = true; + + break; + } + } + } + } + + if (dup) { + for (T3 e : lostEvents) + log.error("Lost event: " + e); + + for (List> e : lsnr.evts.values()) + if (!e.isEmpty()) + log.error("Duplicate event: " + e); + } + + assertFalse("Received duplicate events, see log for details.", dup); + } + + if (!lostAllow && !lostEvents.isEmpty()) { + log.error("Lost event cnt: " + lostEvents.size()); + + for (T3 e : lostEvents) + log.error("Lost event: " + e); + + assertTrue("Lose events, see log for details.", false); + } + + log.error("Lost event cnt: " + lostEvents.size()); + + expEvts.clear(); + + lsnr.evts.clear(); + } + + /** + * @param e Event + * @param expVals expected value + * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups + * than oldValue ignoring in comparison. + */ + private boolean equalOldValue(CacheEntryEvent e, T3 expVals) { + return (e.getOldValue() == null && expVals.get3() == null) // Both null + || (e.getOldValue() != null && expVals.get3() != null // Equals + && e.getOldValue().equals(expVals.get3())) + || (backups == 0); // If we start without backup than oldValue might be lose. + } + + /** + * @param expEvts Expected events. + * @param lsnr Listener. + */ + private void checkEvents(final List> expEvts, final CacheEventListener3 lsnr, + boolean allowLoseEvent) throws Exception { + if (!allowLoseEvent) + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr.evts.size() == expEvts.size(); + } + }, 2000L); + for (T3 exp : expEvts) { CacheEntryEvent e = lsnr.evts.get(exp.get1()); assertNotNull("No event for key: " + exp.get1(), e); assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); + + if (allowLoseEvent) + lsnr.evts.remove(exp.get1()); } + if (allowLoseEvent) + assert lsnr.evts.isEmpty(); + expEvts.clear(); lsnr.evts.clear(); @@ -1058,6 +1179,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final AtomicReference checkLatch = new AtomicReference<>(); + boolean processorPut = false; + IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Callable() { @Override public Void call() throws Exception { final int idx = SRV_NODES + 1; @@ -1093,7 +1216,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final Map>> expEvts = new HashMap<>(); try { - long stopTime = System.currentTimeMillis() + 3 * 60_000; + long stopTime = System.currentTimeMillis() + 1 * 60_000; final int PARTS = qryClient.affinity(null).partitions(); @@ -1110,7 +1233,20 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo else val = val + 1; - qryClientCache.put(key, val); + if (processorPut && prevVal != null) { + qryClientCache.invoke(key, new CacheEntryProcessor() { + @Override public Void process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + entry.setValue(arguments[0]); + + return null; + } + }, val); + } + else + qryClientCache.put(key, val); + + processorPut = !processorPut; vals.put(key, val); @@ -1187,8 +1323,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ - public void testMultiThreaded() throws Exception { - final int SRV_NODES = 3; + public void testFailoverFilter() throws Exception { + final int SRV_NODES = 4; startGridsMultiThreaded(SRV_NODES); @@ -1196,142 +1332,633 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo Ignite qryClient = startGrid(SRV_NODES); - final IgniteCache cache = qryClient.cache(null); + client = false; - CacheEventListener1 lsnr = new CacheEventListener1(true); + IgniteCache qryClientCache = qryClient.cache(null); + + final CacheEventListener2 lsnr = new CacheEventListener2(); ContinuousQuery qry = new ContinuousQuery<>(); qry.setLocalListener(lsnr); - QueryCursor cur = cache.query(qry); - - client = false; - - final int SRV_IDX = SRV_NODES - 1; + qry.setRemoteFilter(new CacheEventFilter()); - List keys = primaryKeys(ignite(SRV_IDX).cache(null), 10); + QueryCursor cur = qryClientCache.query(qry); - final int THREADS = 10; + final AtomicBoolean stop = new AtomicBoolean(); - for (int i = 0; i < keys.size(); i++) { - log.info("Iteration: " + i); + final AtomicReference checkLatch = new AtomicReference<>(); - Ignite srv = ignite(SRV_IDX); + IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + final int idx = SRV_NODES + 1; - TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + while (!stop.get() && !err) { + log.info("Start node: " + idx); - spi.sndFirstOnly = new AtomicBoolean(false); + startGrid(idx); - final Integer key = keys.get(i); + Thread.sleep(3000); - final AtomicInteger val = new AtomicInteger(); + log.info("Stop node: " + idx); - CountDownLatch latch = new CountDownLatch(THREADS); + stopGrid(idx); - lsnr.latch = latch; + CountDownLatch latch = new CountDownLatch(1); - IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { - @Override public Object call() throws Exception { - Integer val0 = val.getAndIncrement(); + assertTrue(checkLatch.compareAndSet(null, latch)); - cache.put(key, val0); + if (!stop.get()) { + log.info("Wait for event check."); - return null; + assertTrue(latch.await(1, MINUTES)); + } } - }, THREADS, "update-thread"); - fut.get(); + return null; + } + }); - stopGrid(SRV_IDX); + final Map vals = new HashMap<>(); - if (!latch.await(5, SECONDS)) - fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']'); + final Map>> expEvts = new HashMap<>(); - assertEquals(THREADS, lsnr.allEvts.size()); + try { + long stopTime = System.currentTimeMillis() + 1 * 60_000; - Set vals = new HashSet<>(); + final int PARTS = qryClient.affinity(null).partitions(); - boolean err = false; + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - for (CacheEntryEvent evt : lsnr.allEvts) { - assertEquals(key, evt.getKey()); - assertNotNull(evt.getValue()); + boolean filtered = false; - if (!vals.add((Integer)evt.getValue())) { - err = true; + boolean processorPut = false; - log.info("Extra event: " + evt); - } - } + while (System.currentTimeMillis() < stopTime) { + Integer key = rnd.nextInt(PARTS); - for (int v = 0; v < THREADS; v++) { - if (!vals.contains(v)) { - err = true; + Integer prevVal = vals.get(key); + Integer val = vals.get(key); - log.info("Event for value not received: " + v); - } - } + if (val == null) + val = 0; + else + val = Math.abs(val) + 1; - assertFalse("Invalid events, see log for details.", err); + if (filtered) + val = -val; - lsnr.allEvts.clear(); + if (processorPut && prevVal != null) { + qryClientCache.invoke(key, new CacheEntryProcessor() { + @Override public Void process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + entry.setValue(arguments[0]); - startGrid(SRV_IDX); - } + return null; + } + }, val); + } + else + qryClientCache.put(key, val); - cur.close(); - } + processorPut = !processorPut; - /** - * @param logAll If {@code true} logs all unexpected values. - * @param expEvts Expected values. - * @param lsnr Listener. - * @return Check status. - */ - @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") - private boolean checkEvents(boolean logAll, - Map>> expEvts, - CacheEventListener2 lsnr) { - assertTrue(!expEvts.isEmpty()); + vals.put(key, val); - boolean pass = true; + if (val >= 0) { + List> keyEvts = expEvts.get(key); - for (Map.Entry>> e : expEvts.entrySet()) { - Integer key = e.getKey(); - List> exp = e.getValue(); + if (keyEvts == null) { + keyEvts = new ArrayList<>(); - List> rcvdEvts = lsnr.evts.get(key); + expEvts.put(key, keyEvts); + } - if (rcvdEvts == null) { - pass = false; + keyEvts.add(new T2<>(val, prevVal)); + } - log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']'); + filtered = !filtered; - if (!logAll) - return false; - } - else { - synchronized (rcvdEvts) { - if (rcvdEvts.size() != exp.size()) { - pass = false; + CountDownLatch latch = checkLatch.get(); - log.info("Missed or extra events for key [key=" + key + - ", exp=" + e.getValue() + - ", rcvd=" + rcvdEvts + ']'); + if (latch != null) { + log.info("Check events."); - if (!logAll) - return false; - } + checkLatch.set(null); - int cnt = Math.min(rcvdEvts.size(), exp.size()); + boolean success = false; - for (int i = 0; i < cnt; i++) { - T2 expEvt = exp.get(i); - CacheEntryEvent rcvdEvt = rcvdEvts.get(i); + try { + if (err) + break; - assertEquals(key, rcvdEvt.getKey()); - assertEquals(expEvt.get1(), rcvdEvt.getValue()); + boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, expEvts, lsnr); + } + }, 10_000); + + if (!check) + assertTrue(checkEvents(true, expEvts, lsnr)); + + success = true; + + log.info("Events checked."); + } + finally { + if (!success) + err = true; + + latch.countDown(); + } + } + } + } + finally { + stop.set(true); + } + + CountDownLatch latch = checkLatch.get(); + + if (latch != null) + latch.countDown(); + + restartFut.get(); + + boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, expEvts, lsnr); + } + }, 10_000); + + if (!check) + assertTrue(checkEvents(true, expEvts, lsnr)); + + cur.close(); + + assertFalse("Unexpected error during test, see log for details.", err); + } + + /** + * @throws Exception If failed. + */ + public void testFailoverStartStopWithoutBackup() throws Exception { + failoverStartStopFilter(0); + } + + /** + * @throws Exception If failed. + */ + public void testFailoverStartStopOneBackup() throws Exception { + failoverStartStopFilter(1); + } + + /** + * @throws Exception If failed. + */ + public void _testStartStop() throws Exception { + this.backups = 0; + + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache qryClnCache = qryClient.cache(null); + + final CacheEventListener2 lsnr = new CacheEventListener2(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilter(new CacheEventFilter()); + + QueryCursor cur = qryClnCache.query(qry); + + for (int i = 0; i < 100; i++) { + final int idx = i % (SRV_NODES - 1); + + log.info("Stop node: " + idx); + + stopGrid(idx); + + Thread.sleep(200); + + List> afterRestEvents = new ArrayList<>(); + + for (int j = 0; j < 10; j++) { + Integer oldVal = (Integer)qryClnCache.get(j); + + qryClnCache.put(j, i); + + afterRestEvents.add(new T3<>((Object)j, (Object)i, (Object)oldVal)); + } + + checkEvents(new ArrayList<>(afterRestEvents), lsnr, false); + + log.info("Start node: " + idx); + + startGrid(idx); + } + + cur.close(); + } + + /** + * @throws Exception If failed. + */ + public void failoverStartStopFilter(int backups) throws Exception { + this.backups = backups; + + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache qryClnCache = qryClient.cache(null); + + final CacheEventListener2 lsnr = new CacheEventListener2(); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilter(new CacheEventFilter()); + + QueryCursor cur = qryClnCache.query(qry); + + CacheEventListener2 dinLsnr = null; + + QueryCursor dinQry = null; + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicReference checkLatch = new AtomicReference<>(); + + IgniteInternalFuture restartFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stop.get() && !err) { + final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1); + + log.info("Stop node: " + idx); + + stopGrid(idx); + + Thread.sleep(100); + + log.info("Start node: " + idx); + + startGrid(idx); + + CountDownLatch latch = new CountDownLatch(1); + + assertTrue(checkLatch.compareAndSet(null, latch)); + + if (!stop.get()) { + log.info("Wait for event check."); + + assertTrue(latch.await(1, MINUTES)); + } + } + + return null; + } + }); + + final Map vals = new HashMap<>(); + + final Map>> expEvts = new HashMap<>(); + + final List> expEvtsNewLsnr = new ArrayList<>(); + + final List> expEvtsLsnr = new ArrayList<>(); + + try { + long stopTime = System.currentTimeMillis() + 60_000; + + // Start new filter each 5 sec. + long startFilterTime = System.currentTimeMillis() + 5_000; + + final int PARTS = qryClient.affinity(null).partitions(); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + boolean filtered = false; + + boolean processorPut = false; + + while (System.currentTimeMillis() < stopTime) { + Integer key = rnd.nextInt(PARTS); + + Integer prevVal = vals.get(key); + Integer val = vals.get(key); + + if (System.currentTimeMillis() > startFilterTime) { + // Stop filter and check events. + if (dinQry != null) { + dinQry.close(); + + log.error("Continuous query listener closed."); + + checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); + } + + dinLsnr = new CacheEventListener2(); + + ContinuousQuery newQry = new ContinuousQuery<>(); + + newQry.setLocalListener(dinLsnr); + + newQry.setRemoteFilter(new CacheEventFilter()); + + dinQry = qryClnCache.query(newQry); + + log.error("Continuous query listener started."); + + startFilterTime = System.currentTimeMillis() + 5_000; + } + + if (val == null) + val = 0; + else + val = Math.abs(val) + 1; + + if (filtered) + val = -val; + + if (processorPut && prevVal != null) { + qryClnCache.invoke(key, new CacheEntryProcessor() { + @Override public Void process(MutableEntry entry, + Object... arguments) throws EntryProcessorException { + entry.setValue(arguments[0]); + + return null; + } + }, val); + } + else + qryClnCache.put(key, val); + + processorPut = !processorPut; + + vals.put(key, val); + + if (val >= 0) { + List> keyEvts = expEvts.get(key); + + if (keyEvts == null) { + keyEvts = new ArrayList<>(); + + expEvts.put(key, keyEvts); + } + + keyEvts.add(new T2<>(val, prevVal)); + + T3 tupVal = new T3<>((Object)key, (Object)val, (Object)prevVal); + + expEvtsLsnr.add(tupVal); + + if (dinQry != null) + expEvtsNewLsnr.add(tupVal); + } + + filtered = !filtered; + + CountDownLatch latch = checkLatch.get(); + + if (latch != null) { + log.info("Check events."); + + checkLatch.set(null); + + boolean success = false; + + try { + if (err) + break; + + checkEvents(expEvtsLsnr, lsnr, backups == 0); + + success = true; + + log.info("Events checked."); + } + finally { + if (!success) + err = true; + + latch.countDown(); + } + } + } + } + finally { + stop.set(true); + } + + CountDownLatch latch = checkLatch.get(); + + if (latch != null) + latch.countDown(); + + restartFut.get(); + + checkEvents(expEvtsLsnr, lsnr, backups == 0); + + lsnr.evts.clear(); + lsnr.vals.clear(); + + if (dinQry != null) { + checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); + + dinLsnr.evts.clear(); + dinLsnr.vals.clear(); + + dinQry.close(); + } + + List> afterRestEvents = new ArrayList<>(); + + for (int i = 0; i < 1024; i++) { + Integer oldVal = (Integer)qryClnCache.get(i); + + qryClnCache.put(i, i); + + afterRestEvents.add(new T3<>((Object)i, (Object)i, (Object)oldVal)); + } + + checkEvents(new ArrayList<>(afterRestEvents), lsnr, false); + + //checkEvents(new ArrayList<>(afterRestEvents), dinLsnr, false); + + cur.close(); + + if (dinQry != null) + dinQry.close(); + + assertFalse("Unexpected error during test, see log for details.", err); + } + + /** + * @throws Exception If failed. + */ + public void testMultiThreaded() throws Exception { + final int SRV_NODES = 3; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + final IgniteCache cache = qryClient.cache(null); + + CacheEventListener1 lsnr = new CacheEventListener1(true); + + ContinuousQuery qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor cur = cache.query(qry); + + client = false; + + final int SRV_IDX = SRV_NODES - 1; + + List keys = primaryKeys(ignite(SRV_IDX).cache(null), 10); + + final int THREADS = 10; + + for (int i = 0; i < keys.size(); i++) { + log.info("Iteration: " + i); + + Ignite srv = ignite(SRV_IDX); + + TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); + + spi.sndFirstOnly = new AtomicBoolean(false); + + final Integer key = keys.get(i); + + final AtomicInteger val = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(THREADS); + + lsnr.latch = latch; + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + Integer val0 = val.getAndIncrement(); + + cache.put(key, val0); + + return null; + } + }, THREADS, "update-thread"); + + fut.get(); + + stopGrid(SRV_IDX); + + if (!latch.await(5, SECONDS)) + fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']'); + + assertEquals(THREADS, lsnr.allEvts.size()); + + Set vals = new HashSet<>(); + + boolean err = false; + + for (CacheEntryEvent evt : lsnr.allEvts) { + assertEquals(key, evt.getKey()); + assertNotNull(evt.getValue()); + + if (!vals.add((Integer)evt.getValue())) { + err = true; + + log.info("Extra event: " + evt); + } + } + + for (int v = 0; v < THREADS; v++) { + if (!vals.contains(v)) { + err = true; + + log.info("Event for value not received: " + v); + } + } + + assertFalse("Invalid events, see log for details.", err); + + lsnr.allEvts.clear(); + + startGrid(SRV_IDX); + } + + cur.close(); + } + + /** + * @param logAll If {@code true} logs all unexpected values. + * @param expEvts Expected values. + * @param lsnr Listener. + * @return Check status. + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + private boolean checkEvents(boolean logAll, + Map>> expEvts, + CacheEventListener2 lsnr) { + assertTrue(!expEvts.isEmpty()); + + boolean pass = true; + + for (Map.Entry>> e : expEvts.entrySet()) { + Integer key = e.getKey(); + List> exp = e.getValue(); + + List> rcvdEvts = lsnr.evts.get(key); + + if (rcvdEvts == null) { + pass = false; + + log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']'); + + if (!logAll) + return false; + } + else { + synchronized (rcvdEvts) { + if (rcvdEvts.size() != exp.size()) { + pass = false; + + log.info("Missed or extra events for key [key=" + key + + ", exp=" + e.getValue() + + ", rcvd=" + rcvdEvts + ']'); + + if (!logAll) + return false; + } + + int cnt = Math.min(rcvdEvts.size(), exp.size()); + + for (int i = 0; i < cnt; i++) { + T2 expEvt = exp.get(i); + CacheEntryEvent rcvdEvt = rcvdEvts.get(i); + + assertEquals(key, rcvdEvt.getKey()); + assertEquals(expEvt.get1(), rcvdEvt.getValue()); } } } @@ -1384,7 +2011,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo this.evts.put(evt.getKey(), evt); - keys.add((Integer) evt.getKey()); + keys.add((Integer)evt.getKey()); if (allEvts != null) allEvts.add(evt); @@ -1423,6 +2050,18 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** */ private final ConcurrentHashMap>> evts = new ConcurrentHashMap<>(); + /** + * @return Count events. + */ + public int size() { + int size = 0; + + for (List> e : evts.values()) + size += e.size(); + + return size; + } + /** {@inheritDoc} */ @Override public synchronized void onUpdated(Iterable> evts) throws CacheEntryListenerException { @@ -1467,6 +2106,44 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * */ + public static class CacheEventListener3 implements CacheEntryUpdatedListener, + CacheEntryEventSerializableFilter { + /** Keys. */ + GridConcurrentHashSet keys = new GridConcurrentHashSet<>(); + + /** Events. */ + private final ConcurrentHashMap> evts = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> events) throws CacheEntryListenerException { + for (CacheEntryEvent e : events) { + Integer key = (Integer)e.getKey(); + + keys.add(key); + + assert evts.put(key, e) == null; + } + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent e) throws CacheEntryListenerException { + return (Integer)e.getValue() % 2 == 0; + } + } + + /** + * + */ + public static class CacheEventFilter implements CacheEntryEventSerializableFilter { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { + return ((Integer)event.getValue()) >= 0; + } + } + + /** + * + */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** */ @LoggerResource