Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1EADD200C81 for ; Fri, 26 May 2017 10:28:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1D501160BC8; Fri, 26 May 2017 08:28:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B3A53160BB8 for ; Fri, 26 May 2017 10:28:44 +0200 (CEST) Received: (qmail 26810 invoked by uid 500); 26 May 2017 08:28:43 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 26789 invoked by uid 99); 26 May 2017 08:28:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 May 2017 08:28:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D72BDFB01; Fri, 26 May 2017 08:28:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: cc Date: Fri, 26 May 2017 08:28:43 +0000 (UTC) archived-at: Fri, 26 May 2017 08:28:46 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075-cc 01f45c1b8 -> e3500de9f cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e3500de9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e3500de9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e3500de9 Branch: refs/heads/ignite-5075-cc Commit: e3500de9f775b091635b06108efc4937f524b6a1 Parents: 01f45c1 Author: sboikov Authored: Fri May 26 11:21:25 2017 +0300 Committer: sboikov Committed: Fri May 26 11:28:35 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 6 +- .../GridCachePartitionExchangeManager.java | 2 +- .../dht/GridClientPartitionTopology.java | 31 ++++--- .../dht/GridDhtPartitionTopology.java | 9 +- .../dht/GridDhtPartitionTopologyImpl.java | 59 ++++++++----- .../GridDhtPartitionsExchangeFuture.java | 34 ++++++-- .../CacheContinuousQueryEventBuffer.java | 91 ++++++++++++-------- .../continuous/CacheContinuousQueryManager.java | 4 +- 8 files changed, 151 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 80f872c..87fe18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -28,7 +28,6 @@ import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; - import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -61,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; -import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -75,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; @@ -1838,8 +1836,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateRes.success()) updateMetrics(c.op, metrics); - lsnrs = cctx.continuousQueries().updateListeners(internal, false); - // Continuous query filter should be perform under lock. if (lsnrs != null) { CacheObject evtVal = cctx.unwrapTemporary(updateVal); http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5314088..2eec8f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1312,7 +1312,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) { - updated |= top.update(null, entry.getValue(), null) != null; + updated |= top.update(null, entry.getValue()) != null; cctx.affinity().checkRebalanceState(top, cacheId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 1de64c5..43bc609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -650,11 +650,29 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void applyUpdateCounters(Map> cntrMap) { + assert cntrMap != null; + + lock.writeLock().lock(); + + try { + for (Map.Entry> e : cntrMap.entrySet()) { + T2 cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr.get2() < e.getValue().get2()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - Map> cntrMap + GridDhtPartitionMap parts ) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -733,15 +751,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } - if (cntrMap != null) { - for (Map.Entry> e : cntrMap.entrySet()) { - T2 cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - } - consistencyCheck(); if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f9fd852..ffc1d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -234,12 +234,15 @@ public interface GridDhtPartitionTopology { /** * @param exchId Exchange ID. * @param parts Partitions. - * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map> cntrMap); + GridDhtPartitionMap parts); + + /** + * @param cntrMap Counters map. + */ + public void applyUpdateCounters(Map> cntrMap); /** * Checks if there is at least one owner for each partition in the cache topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8e79eda..7adce6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1256,11 +1256,45 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public void applyUpdateCounters(Map> cntrMap) { + assert cntrMap != null; + + lock.writeLock().lock(); + + try { + if (stopping) + return; + + for (Map.Entry> e : cntrMap.entrySet()) { + T2 cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr.get2() < e.getValue().get2()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (int i = 0; i < locParts.length(); i++) { + GridDhtLocalPartition part = locParts.get(i); + + if (part == null) + continue; + + T2 cntr = cntrMap.get(part.id()); + + if (cntr != null && cntr.get2() > part.updateCounter()) + part.updateCounter(cntr.get2()); + } + } + finally { + lock.writeLock().unlock(); + + } + } + + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map> cntrMap + GridDhtPartitionMap parts ) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -1279,27 +1313,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; - if (cntrMap != null) { - for (Map.Entry> e : cntrMap.entrySet()) { - T2 cntr = this.cntrMap.get(e.getKey()); - - if (cntr == null || cntr.get2() < e.getValue().get2()) - this.cntrMap.put(e.getKey(), e.getValue()); - } - - for (int i = 0; i < locParts.length(); i++) { - GridDhtLocalPartition part = locParts.get(i); - - if (part == null) - continue; - - T2 cntr = cntrMap.get(part.id()); - - if (cntr != null && cntr.get2() > part.updateCounter()) - part.updateCounter(cntr.get2()); - } - } - if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8b8c87c..dfea951 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; @@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -1098,6 +1098,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter entry : msg0.partitions().entrySet()) { + Integer cacheId = entry.getKey(); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : + cctx.exchange().clientTopology(cacheId, this); + + Map> cntrs = msg0.partitionUpdateCounters(cacheId); + + if (cntrs != null) + top.applyUpdateCounters(cntrs); + } + } + } + if (discoEvt.type() == EVT_NODE_JOINED) { if (cctx.kernalContext().state().active()) assignPartitionsStates(); @@ -1795,7 +1817,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter p : pending.headMap(batch.endCntr, true).entrySet()) { long cntr = p.getKey(); - assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr; + assert cntr <= batch.endCntr; - if (pending.remove(p.getKey()) != null) - res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + if (pending.remove(p.getKey()) != null) { + if (cntr < batch.startCntr) + res = addResult(res, p.getValue(), backup); + else + res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + } } } @@ -246,6 +251,43 @@ public class CacheContinuousQueryEventBuffer { } /** + * @param res Current result. + * @param entry Entry to add. + * @param backup Backup entry flag. + * @return Updated result. + */ + @Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry, boolean backup) { + if (res == null) { + if (backup) + backupQ.add(entry); + else + res = entry; + } + else { + assert !backup; + + List resList; + + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); + + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; + + resList = (List)res; + } + + resList.add(entry); + + res = resList; + } + + return res; + } + + /** * */ private class Batch { @@ -313,7 +355,15 @@ public class CacheContinuousQueryEventBuffer { if (e.isFiltered()) filtered++; else { - flushEntry = e; + flushEntry = new CacheContinuousQueryEntry(e.cacheId(), + e.eventType(), + e.key(), + e.value(), + e.oldValue(), + e.isKeepBinary(), + e.partition(), + e.updateCounter(), + e.topologyVersion()); flushEntry.filteredCount(filtered); @@ -338,8 +388,6 @@ public class CacheContinuousQueryEventBuffer { res.add(filteredEntry(cntr - 1, filtered - 1)); } - entries = null; - return res; } @@ -399,32 +447,7 @@ public class CacheContinuousQueryEventBuffer { filtered = 0; - if (res == null) { - if (backup) - backupQ.add(entry0); - else - res = entry0; - } - else { - assert !backup; - - List resList; - - if (res instanceof CacheContinuousQueryEntry) { - resList = new ArrayList<>(); - - resList.add((CacheContinuousQueryEntry)res); - } - else { - assert res instanceof List : res; - - resList = (List)res; - } - - resList.add(entry0); - - res = resList; - } + res = addResult(res, entry0, backup); } else filtered++; http://git-wip-us.apache.org/repos/asf/ignite/blob/e3500de9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index acf351f..7cbb1e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -568,9 +568,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** - * @param topVer Topology version. + * @param topVer Finished exchange topology version. */ - public void beforeExchange(AffinityTopologyVersion topVer) { + public void flushBackupQueue(AffinityTopologyVersion topVer) { for (CacheContinuousQueryListener lsnr : lsnrs.values()) lsnr.flushBackupQueue(cctx.kernalContext(), topVer); }