Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 71558200B50 for ; Mon, 20 Jun 2016 14:52:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 701BB160A6E; Mon, 20 Jun 2016 12:52:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DCEB1160A70 for ; Mon, 20 Jun 2016 14:52:42 +0200 (CEST) Received: (qmail 7071 invoked by uid 500); 20 Jun 2016 12:52:42 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 6576 invoked by uid 99); 20 Jun 2016 12:52:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 12:52:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91254ED310; Mon, 20 Jun 2016 12:52:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 20 Jun 2016 12:52:51 -0000 Message-Id: <987146183ba740fd8d4417bda203490e@git.apache.org> In-Reply-To: <89985bfe12394840b4deddba739acaf2@git.apache.org> References: <89985bfe12394840b4deddba739acaf2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/50] ignite git commit: ignite-3209 Waiting for affinity topology in case of failover for affinity call archived-at: Mon, 20 Jun 2016 12:52:44 -0000 ignite-3209 Waiting for affinity topology in case of failover for affinity call Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d80a398 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d80a398 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d80a398 Branch: refs/heads/ignite-3341 Commit: 1d80a398a40dd8b4b86bb83499cf0efd1f35b82b Parents: 54425bf Author: agura Authored: Tue Jun 14 17:32:25 2016 +0300 Committer: agura Committed: Tue Jun 14 18:42:17 2016 +0300 ---------------------------------------------------------------------- .../processors/task/GridTaskWorker.java | 130 ++++++++++++------- .../cache/CacheAffinityCallSelfTest.java | 4 - 2 files changed, 81 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index dc86343..651259d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -61,12 +61,15 @@ import org.apache.ignite.internal.GridJobSiblingImpl; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.closure.AffinityTask; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -76,6 +79,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.resources.TaskContinuousMapperResource; @@ -847,8 +852,25 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { } case FAILOVER: { - if (!failover(res, jobRes, getTaskTopology())) - plc = null; + IgniteInternalFuture fut = failover(res, jobRes, getTaskTopology()); + + final GridJobResultImpl jobRes0 = jobRes; + + fut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut0) { + try { + Boolean res = fut0.get(); + + if (res) + sendFailoverRequest(jobRes0); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to failover task [ses=" + ses + ", err=" + e + ']', e); + + finishTask(null, e); + } + } + }); break; } @@ -856,16 +878,11 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { } // Outside of synchronization. - if (plc != null) { - // Handle failover. - if (plc == FAILOVER) - sendFailoverRequest(jobRes); - else { - evtLsnr.onJobFinished(this, jobRes.getSibling()); + if (plc != null && plc != FAILOVER) { + evtLsnr.onJobFinished(this, jobRes.getSibling()); - if (plc == ComputeJobResultPolicy.REDUCE) - reduce(results); - } + if (plc == ComputeJobResultPolicy.REDUCE) + reduce(results); } } catch (IgniteCheckedException e) { @@ -1039,59 +1056,74 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { * @param top Topology. * @return {@code True} if fail-over SPI returned a new node. */ - private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection top) { - assert Thread.holdsLock(mux); + private IgniteInternalFuture failover( + final GridJobExecuteResponse res, + final GridJobResultImpl jobRes, + final Collection top + ) { + IgniteInternalFuture affFut = null; - try { - ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); + if (affKey != null) { + Long topVer = ctx.discovery().topologyVersion(); - // Map to a new node. - ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache); + affFut = ctx.cache().context().exchange().affinityReadyFuture(new AffinityTopologyVersion(topVer)); + } - if (node == null) { - String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + - jobRes.getJob() + ", node=" + jobRes.getNode() + ']'; + if (affFut == null) + affFut = new GridFinishedFuture(); - if (log.isDebugEnabled()) - log.debug(msg); + return affFut.chain(new IgniteClosure, Boolean>() { + @Override public Boolean apply(IgniteInternalFuture fut0) { + synchronized (mux) { + try { + ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); - Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException()); + // Map to a new node. + ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache); - finishTask(null, e); + if (node == null) { + String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + + jobRes.getJob() + ", node=" + jobRes.getNode() + ']'; - return false; - } + if (log.isDebugEnabled()) + log.debug(msg); - if (log.isDebugEnabled()) - log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + - ", job=" + jobRes.getJob() + ", resMsg=" + res + ']'); + Throwable e = new ClusterTopologyCheckedException(msg, jobRes.getException()); - jobRes.setNode(node); - jobRes.resetResponse(); + finishTask(null, e); - if (!resCache) { - synchronized (mux) { - // Store result back in map before sending. - this.jobRes.put(res.getJobId(), jobRes); - } - } + return false; + } - return true; - } - // Catch Throwable to protect against bad user code. - catch (Throwable e) { - String errMsg = "Failed to failover job due to undeclared user exception [job=" + - jobRes.getJob() + ", err=" + e + ']'; + if (log.isDebugEnabled()) + log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + + ", job=" + jobRes.getJob() + ", resMsg=" + res + ']'); - U.error(log, errMsg, e); + jobRes.setNode(node); + jobRes.resetResponse(); - finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); + if (!resCache) // Store result back in map before sending. + GridTaskWorker.this.jobRes.put(res.getJobId(), jobRes); - if (e instanceof Error) - throw (Error)e; + return true; + } + // Catch Throwable to protect against bad user code. + catch (Throwable e) { + String errMsg = "Failed to failover job due to undeclared user exception [job=" + + jobRes.getJob() + ", err=" + e + ']'; - return false; - } + U.error(log, errMsg, e); + + finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); + + if (e instanceof Error) + throw (Error)e; + + return false; + } + } + } + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1d80a398/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index 8530fbb..e4b6ece 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "myCache"; /** */ - private static final int MAX_FAILOVER_ATTEMPTS = 105; - - /** */ private static final int SERVERS_COUNT = 4; /** */ @@ -69,7 +66,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(spi); AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi(); - failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS); cfg.setFailoverSpi(failSpi); CacheConfiguration ccfg = defaultCacheConfiguration();