Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7841E18AE9 for ; Fri, 20 Nov 2015 11:05:41 +0000 (UTC) Received: (qmail 61323 invoked by uid 500); 20 Nov 2015 11:05:41 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 61215 invoked by uid 500); 20 Nov 2015 11:05:41 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59964 invoked by uid 99); 20 Nov 2015 11:05:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 11:05:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48525DFCE4; Fri, 20 Nov 2015 11:05:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Fri, 20 Nov 2015 11:06:09 -0000 Message-Id: <9f327c2e711f478597d71462fc1d5361@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/37] ignite git commit: IGNITE-426 Implemented failover for Continuous query. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 1219f2f..72a60d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectCollection(CacheObject.class) private List vals; + /** Previous values. */ + @GridToStringInclude + @GridDirectCollection(CacheObject.class) + private List prevVals; + /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List conflictVers; @@ -139,10 +144,19 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** Partition. */ + private GridLongList updateCntrs; + /** On response flag. Access should be synced on future. */ @GridDirectTransient private boolean onRes; + @GridDirectTransient + private List partIds; + + @GridDirectTransient + private List localPrevVals; + /** * Empty constructor required by {@link Externalizable}. */ @@ -193,6 +207,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid this.addDepInfo = addDepInfo; keys = new ArrayList<>(); + partIds = new ArrayList<>(); + localPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -216,15 +232,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param prevVal Previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + int partId, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx) { keys.add(key); + partIds.add(partId); + + localPrevVals.add(prevVal); + if (forceTransformBackups) { assert entryProcessor != null; @@ -233,6 +259,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid else vals.add(val); + if (addPrevVal) { + if (prevVals == null) + prevVals = new ArrayList<>(); + + prevVals.add(prevVal); + } + + if (updateIdx != null) { + if (updateCntrs == null) + updateCntrs = new GridLongList(); + + updateCntrs.add(updateIdx); + } + // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -283,8 +323,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @Nullable CacheObject val, EntryProcessor entryProcessor, long ttl, - long expireTime) - { + long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); @@ -415,6 +454,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @param idx Partition index. + * @return Partition id. + */ + public int partitionId(int idx) { + return partIds.get(idx); + } + + /** + * @param updCntr Update counter. + * @return Update counter. + */ + public Long updateCounter(int updCntr) { + if (updateCntrs != null && updCntr < updateCntrs.size()) + return updateCntrs.get(updCntr); + + return null; + } + + /** * @param idx Near key index. * @return Key. */ @@ -435,6 +493,25 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject previousValue(int idx) { + if (prevVals != null) + return prevVals.get(idx); + + return null; + } + + /** + * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject localPreviousValue(int idx) { + return localPrevVals.get(idx); + } + + /** + * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor entryProcessor(int idx) { @@ -544,8 +621,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return invokeArgs; } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -695,42 +771,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("updateCntrs", updateCntrs)) + return false; + + writer.incrementState(); + + case 23: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -857,7 +945,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -865,6 +953,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -876,7 +972,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 18: + case 19: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -884,7 +980,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 20: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -892,7 +988,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 21: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -900,7 +996,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 22: + updateCntrs = reader.readMessage("updateCntrs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -908,7 +1012,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 24: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -928,7 +1032,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 2f2944d..43f34c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter> partCntrs; + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes; + /** */ private boolean client; @@ -90,6 +99,31 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @param cacheId Cache ID. + * @param cntrMap Partition update counters. + */ + public void partitionUpdateCounters(int cacheId, Map cntrMap) { + if (partCntrs == null) + partCntrs = new HashMap<>(); + + partCntrs.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Partition update counters. + */ + public Map partitionUpdateCounters(int cacheId) { + if (partCntrs != null) { + Map res = partCntrs.get(cacheId); + + return res != null ? res : Collections.emptyMap(); + } + + return Collections.emptyMap(); + } + + /** * @return Local partitions. */ public Map partitions() { @@ -103,6 +137,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); + + if (partCntrs != null) + partCntrsBytes = ctx.marshaller().marshal(partCntrs); } /** {@inheritDoc} */ @@ -111,6 +148,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (partCntrsBytes != null) + partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } /** {@inheritDoc} */ @@ -135,6 +175,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 6: + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -165,6 +211,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 6: + partCntrsBytes = reader.readByteArray("partCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -184,7 +238,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1bf03a9..706655b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -249,7 +249,7 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { /*write-through*/false, /*read-through*/false, /*retval*/false, - /**expiry policy*/null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, @@ -263,7 +263,9 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { false, false, subjId, - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -361,7 +363,9 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { false, intercept, req.subjectId(), - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index d078df4..ba58f57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -226,6 +226,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { } /** + * @param cntrs Partition indexes. + */ + @Override public void setPartitionUpdateCounters(long[] cntrs) { + // No-op. + } + + /** * Adds owned versions to map. * * @param vers Map of owned versions. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java new file mode 100644 index 0000000..7db9026 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectMap; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Batch acknowledgement. + */ +public class CacheContinuousQueryBatchAck extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Routine ID. */ + private UUID routineId; + + /** Update counters. */ + @GridToStringInclude + @GridDirectMap(keyType = Integer.class, valueType = Long.class) + private Map updateCntrs; + + /** + * Default constructor. + */ + public CacheContinuousQueryBatchAck() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param routineId Routine ID. + * @param updateCntrs Update counters. + */ + CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map updateCntrs) { + this.cacheId = cacheId; + this.routineId = routineId; + this.updateCntrs = updateCntrs; + } + + /** + * @return Routine ID. + */ + UUID routineId() { + return routineId; + } + + /** + * @return Update counters. + */ + Map updateCntrs() { + return updateCntrs; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeUuid("routineId", routineId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMap("updateCntrs", updateCntrs, MessageCollectionItemType.INT, + MessageCollectionItemType.LONG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + routineId = reader.readUuid("routineId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + updateCntrs = reader.readMap("updateCntrs", MessageCollectionItemType.INT, + MessageCollectionItemType.LONG, false); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CacheContinuousQueryBatchAck.class); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 118; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryBatchAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index a4b35eb..0495e6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -22,10 +22,12 @@ import javax.cache.event.EventType; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -42,6 +44,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { private static final long serialVersionUID = 0L; /** */ + private static final byte BACKUP_ENTRY = 0b0001; + + /** */ + private static final byte FILTERED_ENTRY = 0b0010; + + /** */ private static final EventType[] EVT_TYPE_VALS = EventType.values(); /** @@ -75,8 +83,24 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { @GridDirectTransient private GridDeploymentInfo depInfo; + /** Partition. */ + private int part; + + /** Update counter. */ + private long updateCntr; + + /** Flags. */ + private byte flags; + + /** */ + @GridToStringInclude + private AffinityTopologyVersion topVer; + + /** Filtered events. */ + private GridLongList filteredEvts; + /** - * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}. + * Required by {@link Message}. */ public CacheContinuousQueryEntry() { // No-op. @@ -88,18 +112,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @param key Key. * @param newVal New value. * @param oldVal Old value. + * @param part Partition. + * @param updateCntr Update partition counter. + * @param topVer Topology version if applicable. */ CacheContinuousQueryEntry( int cacheId, EventType evtType, KeyCacheObject key, @Nullable CacheObject newVal, - @Nullable CacheObject oldVal) { + @Nullable CacheObject oldVal, + int part, + long updateCntr, + @Nullable AffinityTopologyVersion topVer) { this.cacheId = cacheId; this.evtType = evtType; this.key = key; this.newVal = newVal; this.oldVal = oldVal; + this.part = part; + this.updateCntr = updateCntr; + this.topVer = topVer; + } + + /** + * @return Topology version if applicable. + */ + @Nullable AffinityTopologyVersion topologyVersion() { + return topVer; } /** @@ -117,6 +157,66 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Partition. + */ + int partition() { + return part; + } + + /** + * @return Update counter. + */ + long updateCounter() { + return updateCntr; + } + + /** + * Mark that entry create on backup. + */ + void markBackup() { + flags |= BACKUP_ENTRY; + } + + /** + * Mark that entry filtered. + */ + void markFiltered() { + flags |= FILTERED_ENTRY; + newVal = null; + oldVal = null; + key = null; + depInfo = null; + } + + /** + * @return {@code True} if entry sent by backup node. + */ + boolean isBackup() { + return (flags & BACKUP_ENTRY) != 0; + } + + /** + * @return {@code True} if entry was filtered. + */ + boolean isFiltered() { + return (flags & FILTERED_ENTRY) != 0; + } + + /** + * @param cntrs Filtered events. + */ + void filteredEvents(GridLongList cntrs) { + filteredEvts = cntrs; + } + + /** + * @return previous filtered events. + */ + long[] filteredEvents() { + return filteredEvts == null ? null : filteredEvts.array(); + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -138,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @throws IgniteCheckedException In case of error. */ void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (!isFiltered()) { + key.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (newVal != null) - newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (newVal != null) + newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (oldVal != null) - oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (oldVal != null) + oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + } } /** @@ -208,23 +310,53 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); case 2: - if (!writer.writeMessage("key", key)) + if (!writer.writeMessage("filteredEvts", filteredEvts)) return false; writer.incrementState(); case 3: - if (!writer.writeMessage("newVal", newVal)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 4: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("newVal", newVal)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("oldVal", oldVal)) return false; writer.incrementState(); + case 7: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeLong("updateCntr", updateCntr)) + return false; + + writer.incrementState(); + } return true; @@ -259,7 +391,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 2: - key = reader.readMessage("key"); + filteredEvts = reader.readMessage("filteredEvts"); if (!reader.isLastRead()) return false; @@ -267,7 +399,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 3: - newVal = reader.readMessage("newVal"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -275,6 +407,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); case 4: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + newVal = reader.readMessage("newVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: oldVal = reader.readMessage("oldVal"); if (!reader.isLastRead()) @@ -282,6 +430,30 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 7: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + updateCntr = reader.readLong("updateCntr"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(CacheContinuousQueryEntry.class); @@ -289,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java index 7417138..a1ebe39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -58,8 +58,7 @@ class CacheContinuousQueryEvent extends CacheEntryEvent { } /** {@inheritDoc} */ - @Override - public K getKey() { + @Override public K getKey() { return e.key().value(cctx.cacheObjectContext(), false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e517c70..b69d4cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -21,8 +21,21 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -30,26 +43,37 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheQueryExecutedEvent; import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteDeploymentCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.util.GridConcurrentSkipListSet; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; @@ -61,6 +85,9 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { /** */ private static final long serialVersionUID = 0L; + /** */ + private static final int BACKUP_ACK_THRESHOLD = 100; + /** Cache name. */ private String cacheName; @@ -97,9 +124,27 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { /** Whether to skip primary check for REPLICATED cache. */ private transient boolean skipPrimaryCheck; + /** Backup queue. */ + private transient Collection backupQueue; + + /** */ + private boolean localCache; + + /** */ + private transient ConcurrentMap rcvs; + + /** */ + private transient ConcurrentMap entryBufs; + + /** */ + private transient AcknowledgeBuffer ackBuf; + /** */ private transient int cacheId; + /** */ + private Map initUpdCntrs; + /** * Required by {@link Externalizable}. */ @@ -121,6 +166,7 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { * @param ignoreExpired Ignore expired events flag. * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. * @param taskHash Task name hash code. + * @param locCache {@code True} if local cache. */ public CacheContinuousQueryHandler( String cacheName, @@ -133,7 +179,8 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { boolean sync, boolean ignoreExpired, int taskHash, - boolean skipPrimaryCheck) { + boolean skipPrimaryCheck, + boolean locCache) { assert topic != null; assert locLsnr != null; @@ -148,6 +195,7 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { this.ignoreExpired = ignoreExpired; this.taskHash = taskHash; this.skipPrimaryCheck = skipPrimaryCheck; + this.localCache = locCache; cacheId = CU.cacheId(cacheName); } @@ -173,6 +221,11 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateCounters(Map cntrs) { + this.initUpdCntrs = cntrs; + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; @@ -185,8 +238,32 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { if (rmtFilter != null) ctx.resource().injectGeneric(rmtFilter); + entryBufs = new ConcurrentHashMap<>(); + + backupQueue = new ConcurrentLinkedDeque8<>(); + + ackBuf = new AcknowledgeBuffer(); + + rcvs = new ConcurrentHashMap<>(); + final boolean loc = nodeId.equals(ctx.localNodeId()); + assert !skipPrimaryCheck || loc; + + final GridCacheContext cctx = cacheContext(ctx); + + if (!internal && cctx != null && initUpdCntrs != null) { + Map map = cctx.topology().updateCounters(); + + for (Map.Entry e : map.entrySet()) { + Long cntr0 = initUpdCntrs.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + initUpdCntrs.put(e.getKey(), cntr1); + } + } + CacheContinuousQueryListener lsnr = new CacheContinuousQueryListener() { @Override public void onExecution() { if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -212,11 +289,15 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return; - GridCacheContext cctx = cacheContext(ctx); + final GridCacheContext cctx = cacheContext(ctx); - if (cctx.isReplicated() && !skipPrimaryCheck && !primary) + // Check that cache stopped. + if (cctx == null) return; + // skipPrimaryCheck is set only when listen locally for replicated cache events. + assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId)); + boolean notify = true; if (rmtFilter != null) { @@ -228,54 +309,94 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { } } - if (notify) { - if (loc) - locLsnr.onUpdated(F.>asList(evt)); - else { - try { - if (cctx.deploymentEnabled() && ctx.discovery().node(nodeId) != null) { - evt.entry().prepareMarshal(cctx); - - cctx.deploy().prepare(evt.entry()); + try { + final CacheContinuousQueryEntry entry = evt.entry(); + + if (!notify) + entry.markFiltered(); + + if (primary || skipPrimaryCheck) { + if (loc) { + if (!localCache) { + Collection entries = handleEvent(ctx, entry); + + if (!entries.isEmpty()) { + final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); + + Iterable> evts = F.viewReadOnly(entries, + new C1>() { + @Override public CacheEntryEvent apply( + CacheContinuousQueryEntry e) { + return new CacheContinuousQueryEvent<>(cache, cctx, e); + } + }, + new IgnitePredicate() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } + } + ); + + locLsnr.onUpdated(evts); + + if (!internal && !skipPrimaryCheck) + sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx); + } + } + else { + if (!entry.isFiltered()) + locLsnr.onUpdated(F.>asList(evt)); } - else - evt.entry().prepareMarshal(cctx); - - ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true); } - catch (ClusterTopologyCheckedException ex) { - IgniteLogger log = ctx.log(getClass()); + else { + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); - if (log.isDebugEnabled()) - log.debug("Failed to send event notification to node, node left cluster " + - "[node=" + nodeId + ", err=" + ex + ']'); - } - catch (IgniteCheckedException ex) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } } + else { + if (!internal) { + entry.markBackup(); - if (recordIgniteEvt) { - ctx.event().record(new CacheQueryReadEvent<>( - ctx.discovery().localNode(), - "Continuous query executed.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.CONTINUOUS.name(), - cacheName, - null, - null, - null, - rmtFilter, - null, - nodeId, - taskName(), - evt.getKey(), - evt.getValue(), - evt.getOldValue(), - null - )); + backupQueue.add(entry); + } } } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + + if (recordIgniteEvt && notify) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS.name(), + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } } @Override public void onUnregister() { @@ -283,6 +404,85 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { ((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister(); } + @Override public void cleanupBackupQueue(Map updateCntrs) { + Iterator it = backupQueue.iterator(); + + while (it.hasNext()) { + CacheContinuousQueryEntry backupEntry = it.next(); + + Long updateCntr = updateCntrs.get(backupEntry.partition()); + + if (updateCntr != null && backupEntry.updateCounter() <= updateCntr) + it.remove(); + } + } + + @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) { + if (backupQueue.isEmpty()) + return; + + try { + GridCacheContext cctx = cacheContext(ctx); + + for (CacheContinuousQueryEntry e : backupQueue) { + if (!e.isFiltered()) + prepareEntry(cctx, nodeId, e); + } + + ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); + + backupQueue.clear(); + } + catch (IgniteCheckedException e) { + U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e); + } + } + + @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx); + } + + @Override public void skipUpdateEvent(CacheContinuousQueryEvent evt, AffinityTopologyVersion topVer) { + try { + assert evt != null; + + CacheContinuousQueryEntry e = evt.entry(); + + EntryBuffer buf = entryBufs.get(e.partition()); + + if (buf == null) { + buf = new EntryBuffer(); + + EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); + + if (oldRec != null) + buf = oldRec; + } + + e = buf.skipEntry(e); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true); + } + catch (ClusterTopologyCheckedException ex) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send event notification to node, node left cluster " + + "[node=" + nodeId + ", err=" + ex + ']'); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + } + + @Override public void onPartitionEvicted(int part) { + for (Iterator it = backupQueue.iterator(); it.hasNext();) { + if (it.next().partition() == part) + it.remove(); + } + } + @Override public boolean oldValueRequired() { return oldValRequired; } @@ -304,6 +504,23 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { return mgr.registerListener(routineId, lsnr, internal); } + /** + * @param cctx Context. + * @param nodeId ID of the node that started routine. + * @param entry Entry. + * @throws IgniteCheckedException In case of error. + */ + private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry) + throws IgniteCheckedException { + if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) { + entry.prepareMarshal(cctx); + + cctx.deploy().prepare(entry); + } + else + entry.prepareMarshal(cctx); + } + /** {@inheritDoc} */ @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { // No-op. @@ -366,17 +583,377 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); - Iterable> evts = F.viewReadOnly(entries, + Collection entries0 = new ArrayList<>(); + + for (CacheContinuousQueryEntry e : entries) + entries0.addAll(handleEvent(ctx, e)); + + Iterable> evts = F.viewReadOnly(entries0, new C1>() { @Override public CacheEntryEvent apply(CacheContinuousQueryEntry e) { return new CacheContinuousQueryEvent<>(cache, cctx, e); } + }, + new IgnitePredicate() { + @Override public boolean apply(CacheContinuousQueryEntry entry) { + return !entry.isFiltered(); + } } ); locLsnr.onUpdated(evts); } + /** + * @param ctx Context. + * @param e entry. + * @return Entry collection. + */ + private Collection handleEvent(GridKernalContext ctx, + CacheContinuousQueryEntry e) { + assert e != null; + + if (internal) { + if (e.isFiltered()) + return Collections.emptyList(); + else + return F.asList(e); + } + + // Initial query entry or evicted entry. + // This events should be fired immediately. + if (e.updateCounter() == -1) + return F.asList(e); + + PartitionRecovery rec = rcvs.get(e.partition()); + + if (rec == null) { + rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(), + initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); + + PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); + + if (oldRec != null) + rec = oldRec; + } + + return rec.collectEntries(e); + } + + /** + * @param e Entry. + * @return Entry. + */ + private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) { + assert e != null; + assert entryBufs != null; + + if (internal) { + if (e.isFiltered()) + return null; + else + return e; + } + + // Initial query entry. + // This events should be fired immediately. + if (e.updateCounter() == -1) + return e; + + EntryBuffer buf = entryBufs.get(e.partition()); + + if (buf == null) { + buf = new EntryBuffer(); + + EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf); + + if (oldRec != null) + buf = oldRec; + } + + return buf.handle(e); + } + + /** + * + */ + private static class PartitionRecovery { + /** Event which means hole in sequence. */ + private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + + /** */ + private final static int MAX_BUFF_SIZE = 100; + + /** */ + private IgniteLogger log; + + /** */ + private long lastFiredEvt; + + /** */ + private AffinityTopologyVersion curTop = AffinityTopologyVersion.NONE; + + /** */ + private final Map pendingEvts = new TreeMap<>(); + + /** + * @param log Logger. + * @param topVer Topology version. + * @param initCntr Update counters. + */ + public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) { + this.log = log; + + if (initCntr != null) { + this.lastFiredEvt = initCntr; + + curTop = topVer; + } + } + + /** + * Add continuous entry. + * + * @param entry Cache continuous query entry. + * @return Collection entries which will be fired. + */ + public Collection collectEntries(CacheContinuousQueryEntry entry) { + assert entry != null; + + List entries; + + synchronized (pendingEvts) { + // Received first event. + if (curTop == AffinityTopologyVersion.NONE) { + lastFiredEvt = entry.updateCounter(); + + curTop = entry.topologyVersion(); + + return F.asList(entry); + } + + if (curTop.compareTo(entry.topologyVersion()) < 0) { + if (entry.updateCounter() == 1 && !entry.isBackup()) { + entries = new ArrayList<>(pendingEvts.size()); + + for (CacheContinuousQueryEntry evt : pendingEvts.values()) { + if (evt != HOLE && !evt.isFiltered()) + entries.add(evt); + } + + pendingEvts.clear(); + + curTop = entry.topologyVersion(); + + lastFiredEvt = entry.updateCounter(); + + entries.add(entry); + + return entries; + } + + curTop = entry.topologyVersion(); + } + + // Check duplicate. + if (entry.updateCounter() > lastFiredEvt) { + pendingEvts.put(entry.updateCounter(), entry); + + // Put filtered events. + if (entry.filteredEvents() != null) { + for (long cnrt : entry.filteredEvents()) { + if (cnrt > lastFiredEvt) + pendingEvts.put(cnrt, HOLE); + } + } + } + else { + if (log.isDebugEnabled()) + log.debug("Skip duplicate continuous query message: " + entry); + + return Collections.emptyList(); + } + + if (pendingEvts.isEmpty()) + return Collections.emptyList(); + + Iterator> iter = pendingEvts.entrySet().iterator(); + + entries = new ArrayList<>(); + + if (pendingEvts.size() >= MAX_BUFF_SIZE) { + for (int i = 0; i < MAX_BUFF_SIZE - (MAX_BUFF_SIZE / 10); i++) { + Map.Entry e = iter.next(); + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(e.getValue()); + + lastFiredEvt = e.getKey(); + + iter.remove(); + } + } + else { + // Elements are consistently. + while (iter.hasNext()) { + Map.Entry e = iter.next(); + + if (e.getKey() == lastFiredEvt + 1) { + ++lastFiredEvt; + + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(e.getValue()); + + iter.remove(); + } + else + break; + } + } + } + + return entries; + } + } + + /** + * + */ + private static class EntryBuffer { + /** */ + private final static int MAX_BUFF_SIZE = 100; + + /** */ + private final GridConcurrentSkipListSet buf = new GridConcurrentSkipListSet<>(); + + /** */ + private AtomicLong lastFiredCntr = new AtomicLong(); + + /** + * @param newVal New value. + * @return Old value if previous value less than new value otherwise {@code -1}. + */ + private long updateFiredCounter(long newVal) { + long prevVal = lastFiredCntr.get(); + + while (prevVal < newVal) { + if (lastFiredCntr.compareAndSet(prevVal, newVal)) + return prevVal; + else + prevVal = lastFiredCntr.get(); + } + + return prevVal >= newVal ? -1 : prevVal; + } + + /** + * @param e Entry. + * @param topVer Topology version. + * @return Continuous query entry. + */ + private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) { + if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) { + + e.markFiltered(); + + return e; + } + else { + buf.add(e.updateCounter()); + + // Double check. If another thread sent a event with counter higher than this event. + if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { + buf.remove(e.updateCounter()); + + e.markFiltered(); + + return e; + } + else + return null; + } + } + + /** + * Add continuous entry. + * + * @param e Cache continuous query entry. + * @return Collection entries which will be fired. + */ + public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { + assert e != null; + + if (e.isFiltered()) { + Long last = buf.lastx(); + Long first = buf.firstx(); + + if (last != null && first != null && last - first >= MAX_BUFF_SIZE) { + NavigableSet prevHoles = buf.subSet(first, true, last, true); + + GridLongList filteredEvts = new GridLongList((int)(last - first)); + + int size = 0; + + Long cntr; + + while ((cntr = prevHoles.pollFirst()) != null) { + filteredEvts.add(cntr); + + ++size; + } + + filteredEvts.truncate(size, true); + + e.filteredEvents(filteredEvts); + + return e; + } + + if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) + return e; + else { + buf.add(e.updateCounter()); + + // Double check. If another thread sent a event with counter higher than this event. + if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) { + buf.remove(e.updateCounter()); + + return e; + } + else + return null; + } + } + else { + long prevVal = updateFiredCounter(e.updateCounter()); + + if (prevVal == -1) + return e; + else { + NavigableSet prevHoles = buf.subSet(prevVal, true, e.updateCounter(), true); + + GridLongList filteredEvts = new GridLongList((int)(e.updateCounter() - prevVal)); + + int size = 0; + + Long cntr; + + while ((cntr = prevHoles.pollFirst()) != null) { + filteredEvts.add(cntr); + + ++size; + } + + filteredEvts.truncate(size, true); + + e.filteredEvents(filteredEvts); + + return e; + } + } + } + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; @@ -397,6 +974,65 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(final UUID routineId, + GridContinuousBatch batch, + final GridKernalContext ctx) { + sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx); + } + + /** + * @param t Acknowledge information. + * @param routineId Routine ID. + * @param ctx Context. + */ + private void sendBackupAcknowledge(final IgniteBiTuple, Set> t, + final UUID routineId, + final GridKernalContext ctx) { + if (t != null) { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + GridCacheContext cctx = cacheContext(ctx); + + CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(), + routineId, + t.get1()); + + Collection nodes = new HashSet<>(); + + for (AffinityTopologyVersion topVer : t.get2()) + nodes.addAll(ctx.discovery().cacheNodes(topVer)); + + for (ClusterNode node : nodes) { + if (!node.id().equals(ctx.localNodeId())) { + try { + cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (ClusterTopologyCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + if (log.isDebugEnabled()) + log.debug("Failed to send acknowledge message, node left " + + "[msg=" + msg + ", node=" + node + ']'); + } + catch (IgniteCheckedException e) { + IgniteLogger log = ctx.log(getClass()); + + U.error(log, "Failed to send acknowledge message " + + "[msg=" + msg + ", node=" + node + ']', e); + } + } + } + } + }); + } + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return topic; } @@ -471,6 +1107,93 @@ class CacheContinuousQueryHandler implements GridContinuousHandler { return ctx.cache().context().cacheContext(cacheId); } + /** */ + private static class AcknowledgeBuffer { + /** */ + private int size; + + /** */ + @GridToStringInclude + private Map updateCntrs = new HashMap<>(); + + /** */ + @GridToStringInclude + private Set topVers = U.newHashSet(1); + + /** + * @param batch Batch. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @SuppressWarnings("unchecked") + @Nullable synchronized IgniteBiTuple, Set> + onAcknowledged(GridContinuousBatch batch) { + size += batch.size(); + + Collection entries = (Collection)batch.collect(); + + for (CacheContinuousQueryEntry e : entries) + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple, Set> + onAcknowledged(CacheContinuousQueryEntry e) { + size++; + + addEntry(e); + + return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null; + } + + /** + * @param e Entry. + */ + private void addEntry(CacheContinuousQueryEntry e) { + topVers.add(e.topologyVersion()); + + Long cntr0 = updateCntrs.get(e.partition()); + + if (cntr0 == null || e.updateCounter() > cntr0) + updateCntrs.put(e.partition(), e.updateCounter()); + } + + /** + * @return Non-null tuple if acknowledge should be sent to backups. + */ + @Nullable synchronized IgniteBiTuple, Set> + acknowledgeOnTimeout() { + return size > 0 ? acknowledgeData() : null; + } + + /** + * @return Tuple with acknowledge information. + */ + private IgniteBiTuple, Set> acknowledgeData() { + assert size > 0; + + Map cntrs = new HashMap<>(updateCntrs); + + IgniteBiTuple, Set> res = + new IgniteBiTuple<>(cntrs, topVers); + + topVers = U.newHashSet(1); + + size = 0; + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AcknowledgeBuffer.class, this); + } + } + /** * Deployable object. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index a3c19a9..8342acf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.processors.cache.query.continuous; +import java.util.Map; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; + /** * Continuous query listener. */ @@ -41,6 +45,37 @@ interface CacheContinuousQueryListener { public void onUnregister(); /** + * Cleans backup queue. + * + * @param updateCntrs Update indexes map. + */ + public void cleanupBackupQueue(Map updateCntrs); + + /** + * Flushes backup queue. + * + * @param ctx Context. + * @param topVer Topology version. + */ + public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer); + + /** + * @param ctx Context. + */ + public void acknowledgeBackupOnTimeout(GridKernalContext ctx); + + /** + * @param evt Event + * @param topVer Topology version. + */ + public void skipUpdateEvent(CacheContinuousQueryEvent evt, AffinityTopologyVersion topVer); + + /** + * @param part Partition. + */ + public void onPartitionEvicted(int part); + + /** * @return Whether old value is required. */ public boolean oldValueRequired();