Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F16718AC5 for ; Fri, 21 Aug 2015 11:52:30 +0000 (UTC) Received: (qmail 99694 invoked by uid 500); 21 Aug 2015 11:52:30 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 99662 invoked by uid 500); 21 Aug 2015 11:52:30 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 99652 invoked by uid 99); 21 Aug 2015 11:52:30 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 11:52:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8BAE61AA909 for ; Fri, 21 Aug 2015 11:52:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.795 X-Spam-Level: * X-Spam-Status: No, score=1.795 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 0Td8ywXPTfij for ; Fri, 21 Aug 2015 11:52:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 393B6254E3 for ; Fri, 21 Aug 2015 11:52:04 +0000 (UTC) Received: (qmail 97624 invoked by uid 99); 21 Aug 2015 11:52:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Aug 2015 11:52:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16284E107B; Fri, 21 Aug 2015 11:52:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 21 Aug 2015 11:52:29 -0000 Message-Id: In-Reply-To: <686106f7db0742ffb55f415a7e7c65ce@git.apache.org> References: <686106f7db0742ffb55f415a7e7c65ce@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/40] incubator-ignite git commit: IGNITE-1275 - Use topology-safe method in marshaller context to prevent deadlocks. IGNITE-1275 - Use topology-safe method in marshaller context to prevent deadlocks. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b93ee7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b93ee7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b93ee7a Branch: refs/heads/ignite-1258 Commit: 6b93ee7a39b94b6edb52de7543fb222ef44a1bd3 Parents: abbd308 Author: Alexey Goncharuk Authored: Thu Aug 20 16:19:01 2015 -0700 Committer: Alexey Goncharuk Committed: Thu Aug 20 16:19:01 2015 -0700 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 2 +- .../processors/cache/GridCacheAdapter.java | 82 +++++-- .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../cache/distributed/dht/GridDhtGetFuture.java | 12 +- .../dht/GridPartitionedGetFuture.java | 86 +++++--- .../dht/atomic/GridDhtAtomicCache.java | 16 +- .../dht/colocated/GridDhtColocatedCache.java | 19 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 15 +- .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearGetFuture.java | 101 ++++++--- .../near/GridNearTransactionalCache.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../local/atomic/GridLocalAtomicCache.java | 17 +- .../IgniteCacheTopologySafeGetSelfTest.java | 215 +++++++++++++++++++ ...gniteCachePutRetryTransactionalSelfTest.java | 2 + .../IgniteCacheFailoverTestSuite.java | 2 + 17 files changed, 494 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 87bd3b6..dc0fd57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -136,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping)."); } - String clsName = cache0.get(id); + String clsName = cache0.getTopologySafe(id); if (clsName == null) { File file = new File(workDir, id + ".classname"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 992edd8..c7fbbfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -526,7 +526,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture> fut) throws IgniteCheckedException { Map map = fut.get(); @@ -560,7 +561,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture> fut) throws IgniteCheckedException { Map kvMap = fut.get(); @@ -894,7 +896,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> entrySet() { - return entrySet((CacheEntryPredicate[]) null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -919,12 +921,12 @@ public abstract class GridCacheAdapter implements IgniteInternalCache primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[]) null); + return primaryKeySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Collection values() { - return values((CacheEntryPredicate[]) null); + return values((CacheEntryPredicate[])null); } /** @@ -1210,22 +1212,57 @@ public abstract class GridCacheAdapter implements IgniteInternalCache getForcePrimaryAsync(final K key) { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, - taskName, true, false).chain(new CX1>, V>() { - @Override public V applyx(IgniteInternalFuture> e) throws IgniteCheckedException { + return getAllAsync( + Collections.singletonList(key), + /*force primary*/true, + /*skip tx*/false, + null, + null, + taskName, + true, + false, + /*can remap*/true + ).chain(new CX1>, V>() { + @Override + public V applyx(IgniteInternalFuture> e) throws IgniteCheckedException { return e.get().get(key); } }); } + public V getTopologySafe(K key) throws IgniteCheckedException { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync( + F.asList(key), + /*force primary*/false, + /*skip tx*/false, + /*cached entry*/null, + /*subject id*/null, + taskName, + /*deserialize cache objects*/true, + /*skip values*/false, + /*can remap*/false + ).get().get(key); + } + /** {@inheritDoc} */ @Nullable @Override public Map getAllOutTx(Set keys) throws IgniteCheckedException { return getAllOutTxAsync(keys).get(); @@ -1242,7 +1279,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache implements IgniteInternalCache(Collections.emptyMap()); @@ -1684,7 +1727,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache map = new GridLeanMap<>(keys.size()); @@ -4461,7 +4504,8 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -540,7 +541,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap deserializePortable, forcePrimary, null, - skipVals); + skipVals, + canRemap); } /** @@ -558,7 +560,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, - boolean skipVals + boolean skipVals, + boolean canRemap ) { return getAllAsync0(keys, readThrough, @@ -568,7 +571,8 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap false, expiry, skipVals, - /*keep cache objects*/true); + /*keep cache objects*/true, + canRemap); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 742fbfe..9005541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -349,12 +349,14 @@ public final class GridDhtGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture cctx; + private final GridCacheContext cctx; /** Keys. */ private Collection keys; @@ -105,6 +105,9 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuturemapsReducer(keys.size())); @@ -147,6 +151,7 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : + canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); map(keys, Collections.>emptyMap(), topVer); @@ -334,7 +340,7 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -461,7 +467,7 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture nodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : nodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @param infos Entry infos. * @return Result map. */ @@ -557,14 +585,14 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFuture keys; + private final LinkedHashMap keys; /** Topology version on which this future was mapped. */ - private AffinityTopologyVersion topVer; + private final AffinityTopologyVersion topVer; /** {@code True} if remapped after node left. */ private boolean remapped; @@ -625,30 +653,38 @@ public class GridPartitionedGetFuture extends GridCompoundIdentityFutureemptyMap()); + } + else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.emptyMap()); + onDone(Collections.emptyMap()); + } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 96e6edc..5b82162 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -248,7 +248,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + final boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -278,7 +279,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { deserializePortable, expiryPlc, skipVals, - skipStore); + skipStore, + canRemap); } }); } @@ -870,8 +872,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVals, - boolean skipStore) { - AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); + boolean skipStore, + boolean canRemap + ) { + AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : + ctx.shared().exchange().readyAffinityVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@ -971,7 +976,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { taskName, deserializePortable, expiry, - skipVals); + skipVals, + canRemap); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 221b230..eb7c78f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -155,7 +155,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -183,7 +184,9 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte }); } - AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); subjId = ctx.subjectIdPerCall(subjId, opCtx); @@ -197,7 +200,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals); + skipVals, + canRemap); } /** {@inheritDoc} */ @@ -226,7 +230,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte * @param skipVals Skip values flag. * @return Loaded values. */ - public IgniteInternalFuture> loadAsync(@Nullable Collection keys, + public IgniteInternalFuture> loadAsync( + @Nullable Collection keys, boolean readThrough, boolean reload, boolean forcePrimary, @@ -235,7 +240,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.emptyMap()); @@ -340,7 +346,8 @@ public class GridDhtColocatedCache extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, expiryPlc, - skipVals); + skipVals, + canRemap); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 041f83a..2bf5365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -364,7 +364,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -387,7 +388,8 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + canRemap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 351d6cd..ba0692c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -195,13 +195,14 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda return (IgniteInternalFuture)loadAsync(tx, keys, reload, - false, + /*force primary*/false, subjId, taskName, - true, - null, + /*deserialize portable*/true, + /*expiry policy*/null, skipVals, - /*skip store*/false); + /*skip store*/false, + /*can remap*/true); } /** @@ -226,7 +227,8 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVal, - boolean skipStore + boolean skipStore, + boolean canRemap ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); @@ -245,7 +247,8 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda taskName, deserializePortable, expiry, - skipVal); + skipVal, + canRemap); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 194c68a..6f4f15e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { true, null, false, - /*skip store*/false).get().get(keyValue(false)); + /*skip store*/false, + /*can remap*/true + ).get().get(keyValue(false)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index d109d2b..ca460c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -62,7 +62,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture cctx; + private final GridCacheContext cctx; /** Keys. */ private Collection keys; @@ -106,6 +106,9 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuturemapsReducer(keys.size())); @@ -148,6 +152,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture>emptyMap(), topVer); @@ -327,7 +334,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -435,7 +442,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture dht = cache().dht(); @@ -472,16 +479,16 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture(Collections.singletonMap(key0, val0))); } else { - if (primary == null) { - primary = cctx.affinity().primary(key, topVer); + if (affNode == null) { + affNode = affinityNode(key, topVer); - if (primary == null) { + if (affNode == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); @@ -527,13 +534,13 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture keys = mapped.get(primary); + LinkedHashMap keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + - "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']')); + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); return savedVers; } @@ -545,10 +552,10 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture old = mappings.get(primary); + LinkedHashMap old = mappings.get(affNode); if (old == null) - mappings.put(primary, old = new LinkedHashMap<>(3, 1f)); + mappings.put(affNode, old = new LinkedHashMap<>(3, 1f)); old.put(key, addRdr); } @@ -579,6 +586,28 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture affNodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : affNodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @return Near cache. */ private GridNearCacheAdapter cache() { @@ -752,30 +781,38 @@ public final class GridNearGetFuture extends GridCompoundIdentityFutureemptyMap()); + } else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.emptyMap()); + onDone(Collections.emptyMap()); + } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 696acfb..a1f1383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -101,7 +101,8 @@ public class GridNearTransactionalCache extends GridNearCacheAdapter @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -142,7 +143,8 @@ public class GridNearTransactionalCache extends GridNearCacheAdapter deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - skipStore); + skipStore, + canRemap); } /** @@ -172,7 +174,8 @@ public class GridNearTransactionalCache extends GridNearCacheAdapter tx.resolveTaskName(), deserializePortable, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index cb391e4..5ff7345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { }); } else if (cacheCtx.isColocated()) { - return cacheCtx.colocated().loadAsync(keys, + return cacheCtx.colocated().loadAsync( + keys, readThrough, /*reload*/false, /*force primary*/false, @@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { resolveTaskName(), deserializePortable, accessPolicy(cacheCtx, keys), - skipVals).chain(new C1>, Boolean>() { + skipVals, + /*can remap*/true + ).chain(new C1>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture> f) { try { Map map = f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index bcbdec4..c648f11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -458,7 +458,8 @@ public class GridLocalAtomicCache extends GridCacheAdapter { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { A.notNull(keys, "keys"); @@ -570,8 +571,18 @@ public class GridLocalAtomicCache extends GridCacheAdapter { if (success || !storeEnabled) return vals; - return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable, - false, expiry, skipVals).get(); + return getAllAsync( + keys, + opCtx == null || !opCtx.skipStore(), + null, + false, + subjId, + taskName, + deserializePortable, + /*force primary*/false, + expiry, + skipVals, + /*can remap*/true).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java new file mode 100644 index 0000000..ef031f6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest { + /** Number of initial grids. */ + public static final int GRID_CNT = 4; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** TX commit latch. */ + private CountDownLatch releaseLatch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration( + cacheCfg("tx", TRANSACTIONAL, false), + cacheCfg("atomic", ATOMIC, false), + cacheCfg("tx_near", TRANSACTIONAL, true), + cacheCfg("atomic_near", ATOMIC, true)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param near Near enabled flag. + * @return Cache configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) { + CacheConfiguration cfg = new CacheConfiguration(name); + + cfg.setAtomicityMode(cacheMode); + cfg.setBackups(1); + + if (near) + cfg.setNearConfiguration(new NearCacheConfiguration()); + else + cfg.setNearConfiguration(null); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoin() throws Exception { + checkGetTopologySafeNodeJoin(false); + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception { + checkGetTopologySafeNodeJoin(true); + } + + /** + * @throws Exception If failed. + */ + public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception { + startGrids(GRID_CNT); + + awaitPartitionMapExchange(); + + try { + ClusterNode targetNode = ignite(1).cluster().localNode(); + + info(">>> Target node: " + targetNode.id()); + + // Populate caches with a key that does not belong to ignite(0). + int key = -1; + for (int i = 0; i < 100; i++) { + Collection nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i); + ClusterNode primaryNode = F.first(nodes); + + if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) { + ignite(1).cache("tx").put(i, i); + ignite(1).cache("atomic").put(i, i); + ignite(1).cache("tx_near").put(i, i); + ignite(1).cache("atomic_near").put(i, i); + + key = i; + + + break; + } + } + + assertTrue(key != -1); + + IgniteInternalFuture txFut = startBlockingTxAsync(); + + IgniteInternalFuture nodeFut = startNodeAsync(); + + if (failPrimary) + stopGrid(1); + + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key)); + + releaseTx(); + + txFut.get(); + nodeFut.get(); + } + finally { + stopAllGrids(); + } + } + + private IgniteInternalFuture startNodeAsync() throws Exception { + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override + public Object call() throws Exception { + startGrid(GRID_CNT); + + return null; + } + }); + + U.sleep(1000); + + return fut; + } + + /** + * @return TX release future. + * @throws Exception If failed. + */ + private IgniteInternalFuture startBlockingTxAsync() throws Exception { + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 30; i++) + ignite(0).cache("tx").get("value-" + i); + + releaseLatch = new CountDownLatch(1); + + lockLatch.countDown(); + + releaseLatch.await(); + } + + return null; + } + }); + + lockLatch.await(); + + return fut; + } + + /** + * + */ + private void releaseTx() { + assert releaseLatch != null; + + releaseLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 9c4446d..c2fc46c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -120,6 +120,8 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr stopGrid(stopIdx); + U.sleep(500); + startGrid(stopIdx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index af2b85c..b64471b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -80,6 +80,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSizeFailoverTest.class); + suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class); + return suite; } }