Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5E62E18406 for ; Fri, 14 Aug 2015 13:23:15 +0000 (UTC) Received: (qmail 25728 invoked by uid 500); 14 Aug 2015 13:23:15 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 25698 invoked by uid 500); 14 Aug 2015 13:23:15 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 25689 invoked by uid 99); 14 Aug 2015 13:23:15 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 13:23:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CCD54DDD5E for ; Fri, 14 Aug 2015 13:23:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.781 X-Spam-Level: X-Spam-Status: No, score=0.781 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 0rcHelOdmfHH for ; Fri, 14 Aug 2015 13:23:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 6A8EE20DD0 for ; Fri, 14 Aug 2015 13:22:55 +0000 (UTC) Received: (qmail 24148 invoked by uid 99); 14 Aug 2015 13:22:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 13:22:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8EE37E04C9; Fri, 14 Aug 2015 13:22:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # Properly handle ClusterTopologyServerNotFoundException for retries Date: Fri, 14 Aug 2015 13:22:54 +0000 (UTC) Repository: incubator-ignite Updated Branches: refs/heads/ignite-1.3.3-p2 122a9dbf3 -> 2903a29e7 # Properly handle ClusterTopologyServerNotFoundException for retries Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2903a29e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2903a29e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2903a29e Branch: refs/heads/ignite-1.3.3-p2 Commit: 2903a29e7a50802617872bfd0fcc3497c4c7785e Parents: 122a9db Author: sboikov Authored: Fri Aug 14 16:22:25 2015 +0300 Committer: sboikov Committed: Fri Aug 14 16:22:25 2015 +0300 ---------------------------------------------------------------------- .../CachePartialUpdateCheckedException.java | 29 +++++++++++- .../processors/cache/GridCacheAdapter.java | 50 ++++++++++++-------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 48 +++++++++++-------- .../near/GridNearOptimisticTxPrepareFuture.java | 2 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 1 - 5 files changed, 86 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java index 0272b7c..f430d12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; import java.util.*; @@ -32,6 +33,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { /** Failed keys. */ private final Collection failedKeys = new ArrayList<>(); + /** */ + private AffinityTopologyVersion topVer; + /** * @param msg Error message. */ @@ -50,13 +54,36 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException { /** * @param failedKeys Failed keys. * @param err Error. + * @param topVer Topology version for failed update. */ - public void add(Collection failedKeys, Throwable err) { + public void add(Collection failedKeys, Throwable err, AffinityTopologyVersion topVer) { + if (topVer != null) { + AffinityTopologyVersion topVer0 = this.topVer; + + if (topVer0 == null || topVer.compareTo(topVer0) > 0) + this.topVer = topVer; + } + this.failedKeys.addAll(failedKeys); addSuppressed(err); } + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @param failedKeys Failed keys. + * @param err Error. + */ + public void add(Collection failedKeys, Throwable err) { + add(failedKeys, err, null); + } + /** {@inheritDoc} */ @Override public String getMessage() { return super.getMessage() + ": " + failedKeys; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 91af352..992edd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3975,13 +3975,17 @@ public abstract class GridCacheAdapter implements IgniteInternalCache 0 : tx; + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer != null && topVer.topologyVersion() > 0 : tx; - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1).get(); - continue; + continue; + } } throw e; @@ -4702,31 +4706,35 @@ public abstract class GridCacheAdapter implements IgniteInternalCache 0) { - IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - assert tx != null; + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; - AffinityTopologyVersion topVer = tx.topologyVersion(); + assert tx != null; - assert topVer != null && topVer.topologyVersion() > 0 : tx; + AffinityTopologyVersion topVer = tx.topologyVersion(); - IgniteInternalFuture topFut = - ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); + assert topVer != null && topVer.topologyVersion() > 0 : tx; - topFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture topFut) { - try { - topFut.get(); + IgniteInternalFuture topFut = + ctx.affinity().affinityReadyFuture(topVer.topologyVersion() + 1); - execute(); - } - catch (IgniteCheckedException e) { - onDone(e); + topFut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture topFut) { + try { + topFut.get(); + + execute(); + } + catch (IgniteCheckedException e) { + onDone(e); + } } - } - }); + }); - return; + return; + } } onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 0498839..5dc5494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -275,6 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (singleNodeId.equals(nodeId)) { onDone(addFailedKeys( singleReq.keys(), + singleReq.topologyVersion(), new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId))); return true; @@ -286,8 +287,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { - addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " + - "received: " + nodeId)); + addFailedKeys(req.keys(), + req.topologyVersion(), + new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); mappings.remove(nodeId); @@ -356,8 +358,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** * @param failed Keys to remap. + * @param errTopVer Topology version for failed update. */ - private void remap(Collection failed) { + private void remap(Collection failed, AffinityTopologyVersion errTopVer) { + assert errTopVer != null; + GridCacheVersion futVer0 = futVer; if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null) @@ -409,15 +414,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter GridFutureAdapter fut0; - long nextTopVer; - synchronized (this) { mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); assert topVer != null && topVer.topologyVersion() > 0 : this; - nextTopVer = topVer.topologyVersion() + 1; - topVer = AffinityTopologyVersion.ZERO; fut0 = topCompleteFut; @@ -434,7 +435,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter updVer = null; topLocked = false; - IgniteInternalFuture fut = cctx.affinity().affinityReadyFuture(nextTopVer); + IgniteInternalFuture fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1); fut.listen(new CI1>() { @Override public void apply(final IgniteInternalFuture fut) { @@ -471,15 +472,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter X.hasCause(err, ClusterTopologyCheckedException.class) && storeFuture() && remapCnt.decrementAndGet() > 0) { + ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class); - CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class); + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class); - if (F.isEmpty(cause.failedKeys())) - cause.printStackTrace(); + assert cause != null && cause.topologyVersion() != null : err; - remap(cause.failedKeys()); + remap(cause.failedKeys(), cause.topologyVersion()); - return false; + return false; + } } if (super.onDone(retval, err)) { @@ -528,8 +531,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter updateNear(singleReq, res); - if (res.error() != null) - onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error()); + if (res.error() != null) { + onDone(res.failedKeys() != null ? + addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error()); + } else { if (op == TRANSFORM) { if (ret != null) @@ -551,7 +556,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter updateNear(req, res); if (res.error() != null) - addFailedKeys(req.keys(), res.error()); + addFailedKeys(req.keys(), req.topologyVersion(), res.error()); else { if (op == TRANSFORM) { assert !req.fastMap(); @@ -1048,7 +1053,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter onDone(new GridCacheReturn(cctx, true, null, true)); } catch (IgniteCheckedException e) { - onDone(addFailedKeys(req.keys(), e)); + onDone(addFailedKeys(req.keys(), req.topologyVersion(), e)); } } } @@ -1079,7 +1084,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } catch (IgniteCheckedException e) { - addFailedKeys(req.keys(), e); + addFailedKeys(req.keys(), req.topologyVersion(), e); removeMapping(req.nodeId()); } @@ -1135,10 +1140,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. * @param err Error cause. * @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}. */ - private synchronized IgniteCheckedException addFailedKeys(Collection failedKeys, Throwable err) { + private synchronized IgniteCheckedException addFailedKeys(Collection failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { CachePartialUpdateCheckedException err0 = this.err; if (err0 == null) @@ -1149,7 +1157,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter for (KeyCacheObject key : failedKeys) keys.add(key.value(cctx.cacheObjectContext(), false)); - err0.add(keys, err); + err0.add(keys, err, topVer); return err0; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 44b7997..2b86672 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -416,7 +416,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + "partition nodes left the grid): " + cacheCtx.name())); return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2903a29e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 0a8f87c..ff948a1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -5010,7 +5010,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertThrows(log, new Callable() { @Override public Object call() throws Exception { - IgniteFuture fut = cache.future().chain(new IgniteClosure() { @Override public Object apply(IgniteFuture o) { return o.get();