Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 84700200C81 for ; Fri, 26 May 2017 08:31:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 82E86160BC8; Fri, 26 May 2017 06:31:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 259D0160BB8 for ; Fri, 26 May 2017 08:31:23 +0200 (CEST) Received: (qmail 29370 invoked by uid 500); 26 May 2017 06:31:23 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 29361 invoked by uid 99); 26 May 2017 06:31:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 May 2017 06:31:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1CB8FDFFB7; Fri, 26 May 2017 06:31:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: cc Date: Fri, 26 May 2017 06:31:23 +0000 (UTC) archived-at: Fri, 26 May 2017 06:31:25 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075-cc1 [created] 13d378534 cc Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/13d37853 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/13d37853 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/13d37853 Branch: refs/heads/ignite-5075-cc1 Commit: 13d378534013b84a55656b8873bd7092544e302c Parents: 01f45c1 Author: sboikov Authored: Fri May 26 09:31:12 2017 +0300 Committer: sboikov Committed: Fri May 26 09:31:12 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 12 +++- .../dht/GridClientPartitionTopology.java | 5 ++ .../distributed/dht/GridDhtLocalPartition.java | 8 ++- .../dht/GridDhtPartitionTopology.java | 2 + .../dht/GridDhtPartitionTopologyImpl.java | 62 +++++++++++------ .../GridNearAtomicSingleUpdateFuture.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 27 ++++++-- .../CacheContinuousQueryEventBuffer.java | 70 ++++++++++++-------- .../CacheContinuousQueryPartitionRecovery.java | 5 +- 9 files changed, 134 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 80f872c..ba21964 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -28,7 +28,6 @@ import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; - import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -61,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; -import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -75,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; @@ -1726,6 +1724,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr0 = nextPartCounter(); + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(partition(), updateCntr0, "set new counter conflict on " + (primary ? "primary" : "backup")); + if (updateCntr != null) updateCntr0 = updateCntr; @@ -2615,6 +2615,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr = 0; + assert preload; + if (!preload) updateCntr = nextPartCounter(topVer); @@ -4410,6 +4412,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long updateCntr0 = entry.nextPartCounter(); + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(entry.partition(), updateCntr0, "new counter for update on " + (primary ? "primary" : "backup")); + if (updateCntr != null) updateCntr0 = updateCntr; @@ -4490,6 +4494,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, entry.key); + System.exit(11); + long updateCntr0 = entry.nextPartCounter(); if (updateCntr != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 1de64c5..2eebda3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -649,6 +649,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + @Override + public void updateCounters(Map> cntrMap) { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update( http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 6fb557a..fd76d3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -799,7 +799,11 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return Current update index. */ public long updateCounter() { - return store.updateCounter(); + long cntr0 = store.updateCounter(); + + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, cntr0, "provide counter " + cctx.shared().exchange().lastTopologyFuture().topologyVersion()); + + return cntr0; } /** @@ -813,6 +817,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param val Update index value. */ public void updateCounter(long val) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(id, val, "set new counter"); + store.updateCounter(val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index f9fd852..91289a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -241,6 +241,8 @@ public interface GridDhtPartitionTopology { GridDhtPartitionMap parts, @Nullable Map> cntrMap); + public void updateCounters(Map> cntrMap); + /** * Checks if there is at least one owner for each partition in the cache topology. * If not, marks such a partition as LOST. http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 8e79eda..d93fedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1122,8 +1122,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { T2 cntr = cntrMap.get(part.id()); - if (cntr != null) + if (cntr != null) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(), cntr.get2(), "set new counter from full " + exchId.topologyVersion()); + part.updateCounter(cntr.get2()); + } } } @@ -1255,29 +1258,13 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap update( - @Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map> cntrMap - ) { - if (log.isDebugEnabled()) - log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); - - if (!cctx.discovery().alive(parts.nodeId())) { - if (log.isDebugEnabled()) - log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + - ", parts=" + parts + ']'); - - return null; - } - + @Override + public void updateCounters(Map> cntrMap) { lock.writeLock().lock(); try { if (stopping) - return null; + return; if (cntrMap != null) { for (Map.Entry> e : cntrMap.entrySet()) { @@ -1295,10 +1282,43 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { T2 cntr = cntrMap.get(part.id()); - if (cntr != null && cntr.get2() > part.updateCounter()) + if (cntr != null && cntr.get2() > part.updateCounter()) { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(part.id(), cntr.get2(), "set new counter from single"); + part.updateCounter(cntr.get2()); + } } } + } + finally { + lock.writeLock().unlock(); + + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) + @Nullable @Override public GridDhtPartitionMap update( + @Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionMap parts, + @Nullable Map> cntrMap + ) { + if (log.isDebugEnabled()) + log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); + + if (!cctx.discovery().alive(parts.nodeId())) { + if (log.isDebugEnabled()) + log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId + + ", parts=" + parts + ']'); + + return null; + } + + lock.writeLock().lock(); + + try { + if (stopping) + return null; if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6ffa373..16301e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -558,6 +558,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda ClusterNode primary = nodes.get(0); + //TestDebugLog.addEntryMessage(cacheKey.partition(), primary.id(), "mapped primary"); + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; GridNearAtomicAbstractUpdateRequest req; http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8b8c87c..4b25ac4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.PartitionLossPolicy; @@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNeedReconnectException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.ClusterState; import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; -import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -494,6 +494,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter p : pending.headMap(batch.endCntr, true).entrySet()) { long cntr = p.getKey(); - assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr; + assert cntr <= batch.endCntr; + + if (pending.remove(p.getKey()) != null) { + if (cntr < batch.startCntr) + res = addResult(res, p.getValue(), backup); + else + res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + } + } + } + + return res; + } + + private Object addResult(Object res, CacheContinuousQueryEntry entry, boolean backup) { + if (res == null) { + if (backup) + backupQ.add(entry); + else + res = entry; + } + else { + assert !backup; + + List resList; + + if (res instanceof CacheContinuousQueryEntry) { + resList = new ArrayList<>(); + + resList.add((CacheContinuousQueryEntry)res); + } + else { + assert res instanceof List : res; - if (pending.remove(p.getKey()) != null) - res = batch.processEntry0(res, p.getKey(), p.getValue(), backup); + resList = (List)res; } + + resList.add(entry); + + res = resList; } return res; @@ -399,32 +436,7 @@ public class CacheContinuousQueryEventBuffer { filtered = 0; - if (res == null) { - if (backup) - backupQ.add(entry0); - else - res = entry0; - } - else { - assert !backup; - - List resList; - - if (res instanceof CacheContinuousQueryEntry) { - resList = new ArrayList<>(); - - resList.add((CacheContinuousQueryEntry)res); - } - else { - assert res instanceof List : res; - - resList = (List)res; - } - - resList.add(entry0); - - res = resList; - } + res = addResult(res, entry0, backup); } else filtered++; http://git-wip-us.apache.org/repos/asf/ignite/blob/13d37853/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java index 59252d2..e1bc49a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryPartitionRecovery.java @@ -243,8 +243,11 @@ class CacheContinuousQueryPartitionRecovery { else { if (pending.isFiltered()) skippedFiltered = true; - else + else { + //org.apache.ignite.spi.collision.TestDebugLog.addEntryMessage(pending.partition(), pending.updateCounter(), " stop process last=" + lastFiredEvt); + break; + } } }