Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C05791808F for ; Mon, 20 Jul 2015 20:48:03 +0000 (UTC) Received: (qmail 499 invoked by uid 500); 20 Jul 2015 20:48:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 469 invoked by uid 500); 20 Jul 2015 20:48:03 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 460 invoked by uid 99); 20 Jul 2015 20:48:03 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 20:48:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 32911D63CF for ; Mon, 20 Jul 2015 20:48:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.554 X-Spam-Level: X-Spam-Status: No, score=0.554 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.227, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id jlU4LLpA8jj0 for ; Mon, 20 Jul 2015 20:47:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 131C42C6C3 for ; Mon, 20 Jul 2015 20:47:50 +0000 (UTC) Received: (qmail 99478 invoked by uid 99); 20 Jul 2015 20:47:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 20:47:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C87E8E1178; Mon, 20 Jul 2015 20:47:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 20 Jul 2015 20:47:55 -0000 Message-Id: In-Reply-To: <80c0d227ff214ce4a5955c10b392a4df@git.apache.org> References: <80c0d227ff214ce4a5955c10b392a4df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/19] incubator-ignite git commit: #ignite-1087: AffinityRun runs job on not primary nodes. #ignite-1087: AffinityRun runs job on not primary nodes. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9d2a2ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9d2a2ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9d2a2ef Branch: refs/heads/ignite-884 Commit: f9d2a2ef4424bf166912594791b6414b9a9e8457 Parents: ede9612 Author: ivasilinets Authored: Mon Jul 20 11:25:00 2015 +0300 Committer: ivasilinets Committed: Mon Jul 20 11:25:00 2015 +0300 ---------------------------------------------------------------------- .../ignite/compute/ComputeJobResultPolicy.java | 3 +- .../failover/GridFailoverContextImpl.java | 28 ++- .../managers/failover/GridFailoverManager.java | 13 +- .../processors/closure/AffinityTask.java | 35 ++++ .../closure/GridClosureProcessor.java | 63 ++++++- .../processors/task/GridTaskWorker.java | 24 ++- .../ignite/spi/failover/FailoverContext.java | 18 ++ .../spi/failover/always/AlwaysFailoverSpi.java | 25 +++ .../cache/CacheAffinityCallSelfTest.java | 172 +++++++++++++++++++ .../cache/GridCacheAffinityRoutingSelfTest.java | 157 ++++++++++++++++- .../spi/failover/GridFailoverTestContext.java | 10 ++ .../ignite/testsuites/IgniteCacheTestSuite.java | 1 + 12 files changed, 533 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java index 37aba91..26eb542 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobResultPolicy.java @@ -50,8 +50,7 @@ public enum ComputeJobResultPolicy { * @param ord Ordinal value. * @return Enumerated value. */ - @Nullable - public static ComputeJobResultPolicy fromOrdinal(byte ord) { + @Nullable public static ComputeJobResultPolicy fromOrdinal(byte ord) { return ord >= 0 && ord < VALS.length ? VALS[ord] : null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java index a3f8e44..c2b104e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverContextImpl.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.managers.loadbalancer.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.failover.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -41,15 +42,26 @@ public class GridFailoverContextImpl implements FailoverContext { @GridToStringExclude private final GridLoadBalancerManager loadMgr; + /** Affinity key for affinityCall. */ + private final Object affKey; + + /** Affinity cache name for affinityCall. */ + private final String affCacheName; + /** * Initializes failover context. * * @param taskSes Grid task session. * @param jobRes Failed job result. * @param loadMgr Load manager. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ - public GridFailoverContextImpl(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, - GridLoadBalancerManager loadMgr) { + public GridFailoverContextImpl(GridTaskSessionImpl taskSes, + ComputeJobResult jobRes, + GridLoadBalancerManager loadMgr, + @Nullable Object affKey, + @Nullable String affCacheName) { assert taskSes != null; assert jobRes != null; assert loadMgr != null; @@ -57,6 +69,8 @@ public class GridFailoverContextImpl implements FailoverContext { this.taskSes = taskSes; this.jobRes = jobRes; this.loadMgr = loadMgr; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -75,6 +89,16 @@ public class GridFailoverContextImpl implements FailoverContext { } /** {@inheritDoc} */ + @Nullable @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Nullable @Override public String affinityCacheName() { + return affCacheName; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFailoverContextImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java index 714cccb..dffc965 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/failover/GridFailoverManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.spi.failover.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -56,11 +57,17 @@ public class GridFailoverManager extends GridManagerAdapter { /** * @param taskSes Task session. * @param jobRes Job result. - * @param top Collection of all top nodes that does not include the failed node. + * @param top Collection of all topology nodes. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. * @return New node to route this job to. */ - public ClusterNode failover(GridTaskSessionImpl taskSes, ComputeJobResult jobRes, List top) { + public ClusterNode failover(GridTaskSessionImpl taskSes, + ComputeJobResult jobRes, + List top, + @Nullable Object affKey, + @Nullable String affCacheName) { return getSpi(taskSes.getFailoverSpi()).failover(new GridFailoverContextImpl(taskSes, jobRes, - ctx.loadBalancing()), top); + ctx.loadBalancing(), affKey, affCacheName), top); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java new file mode 100644 index 0000000..1b32444 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/AffinityTask.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.jetbrains.annotations.*; + +/** + * Affinity mapped task. + */ +public interface AffinityTask { + /** + * @return Affinity key. + */ + public Object affinityKey(); + + /** + * @return Affinity cache name. + */ + @Nullable public String affinityCacheName(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 658557e..21bfc11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -413,9 +413,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); + if (node == null) + return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, U.emptyTopologyException()); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T5(node, job), null, false); + return ctx.task().execute(new T5(node, job, affKey0, cacheName), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T5.class, e); @@ -445,9 +448,12 @@ public class GridClosureProcessor extends GridProcessorAdapter { final ClusterNode node = ctx.affinity().mapKeyToNode(cacheName, affKey0); + if (node == null) + return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, U.emptyTopologyException()); + ctx.task().setThreadContext(TC_SUBGRID, nodes); - return ctx.task().execute(new T4(node, job), null, false); + return ctx.task().execute(new T4(node, job, affKey0, cacheName), null, false); } catch (IgniteCheckedException e) { return ComputeTaskInternalFuture.finishedFuture(ctx, T4.class, e); @@ -1223,7 +1229,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ - private static class T4 extends TaskNoReduceAdapter implements GridNoImplicitInjection { + private static class T4 extends TaskNoReduceAdapter implements GridNoImplicitInjection, AffinityTask { /** */ private static final long serialVersionUID = 0L; @@ -1233,15 +1239,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ private Runnable job; + /** */ + private Object affKey; + + /** */ + private String affCacheName; + /** * @param node Cluster node. * @param job Job. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ - private T4(ClusterNode node, Runnable job) { + private T4(ClusterNode node, Runnable job, Object affKey, String affCacheName) { super(U.peerDeployAware0(job)); + assert affKey != null; + this.node = node; this.job = job; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -1250,11 +1268,22 @@ public class GridClosureProcessor extends GridProcessorAdapter { return Collections.singletonMap(job, node); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Nullable @Override public String affinityCacheName() { + return affCacheName; + } } /** */ - private static class T5 extends GridPeerDeployAwareTaskAdapter implements GridNoImplicitInjection { + private static class T5 extends GridPeerDeployAwareTaskAdapter implements + GridNoImplicitInjection, AffinityTask { /** */ private static final long serialVersionUID = 0L; @@ -1264,15 +1293,27 @@ public class GridClosureProcessor extends GridProcessorAdapter { /** */ private Callable job; + /** */ + private Object affKey; + + /** */ + private String affCacheName; + /** * @param node Cluster node. * @param job Job. + * @param affKey Affinity key. + * @param affCacheName Affinity cache name. */ - private T5(ClusterNode node, Callable job) { + private T5(ClusterNode node, Callable job, Object affKey, String affCacheName) { super(U.peerDeployAware0(job)); + assert affKey != null; + this.node = node; this.job = job; + this.affKey = affKey; + this.affCacheName = affCacheName; } /** {@inheritDoc} */ @@ -1291,6 +1332,16 @@ public class GridClosureProcessor extends GridProcessorAdapter { throw new IgniteException("Failed to find successful job result: " + res); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Nullable @Override public String affinityCacheName() { + return affCacheName; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 133a31f..f241bcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.closure.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -136,6 +137,12 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { private final boolean noFailover; /** */ + private final Object affKey; + + /** */ + private final String affCache; + + /** */ private final UUID subjId; /** Continuous mapper. */ @@ -245,6 +252,17 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { Boolean noFailover = getThreadContext(TC_NO_FAILOVER); this.noFailover = noFailover != null ? noFailover : false; + + if (task instanceof AffinityTask) { + AffinityTask affTask = (AffinityTask)task; + + affKey = affTask.affinityKey(); + affCache = affTask.affinityCacheName(); + } + else { + affKey = null; + affCache = null; + } } /** @@ -397,7 +415,9 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { ses.setClassLoader(dep.classLoader()); - final List shuffledNodes = getTaskTopology(); + // Nodes are ignored by affinity tasks. + final List shuffledNodes = + affKey == null ? getTaskTopology() : Collections.emptyList(); // Load balancer. ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes); @@ -968,7 +988,7 @@ class GridTaskWorker extends GridWorker implements GridTimeoutObject { ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); // Map to a new node. - ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top)); + ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top), affKey, affCache); if (node == null) { String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index b0cae92..865f1a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -20,6 +20,8 @@ package org.apache.ignite.spi.failover; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -52,4 +54,20 @@ public interface FailoverContext { * @throws IgniteException If anything failed. */ public ClusterNode getBalancedNode(List top) throws IgniteException; + + /** + * Gets affinity key for {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} + * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. + * + * @return Affinity key. + */ + @Nullable public Object affinityKey(); + + /** + * Returns affinity cache name {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)} + * and {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)}. + * + * @return Cache name. + */ + @Nullable public String affinityCacheName(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java index e075d3e..e925995 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/always/AlwaysFailoverSpi.java @@ -92,6 +92,11 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, */ public static final String FAILED_NODE_LIST_ATTR = "gg:failover:failednodelist"; + /** + * Name of job context attribute containing number of affinity call attempts. + */ + public static final String AFFINITY_CALL_ATTEMPT = "ignite:failover:affinitycallattempt"; + /** Maximum attempts attribute key should be the same on all nodes. */ public static final String MAX_FAILOVER_ATTEMPT_ATTR = "gg:failover:maxattempts"; @@ -173,6 +178,26 @@ public class AlwaysFailoverSpi extends IgniteSpiAdapter implements FailoverSpi, return null; } + if (ctx.affinityKey() != null) { + Integer affCallAttempt = ctx.getJobResult().getJobContext().getAttribute(AFFINITY_CALL_ATTEMPT); + + if (affCallAttempt == null) + affCallAttempt = 1; + + if (maxFailoverAttempts <= affCallAttempt) { + U.warn(log, "Job failover failed because number of maximum failover attempts for affinity call" + + " is exceeded [failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + + maxFailoverAttempts + ']'); + + return null; + } + else { + ctx.getJobResult().getJobContext().setAttribute(AFFINITY_CALL_ATTEMPT, affCallAttempt + 1); + + return ignite.affinity(ctx.affinityCacheName()).mapKeyToNode(ctx.affinityKey()); + } + } + Collection failedNodes = ctx.getJobResult().getJobContext().getAttribute(FAILED_NODE_LIST_ATTR); if (failedNodes == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java new file mode 100644 index 0000000..c4436ca --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Test for {@link IgniteCompute#affinityCall(String, Object, IgniteCallable)} and + * {@link IgniteCompute#affinityRun(String, Object, IgniteRunnable)}. + */ +public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { + /** */ + private static final String CACHE_NAME = "myCache"; + + /** */ + private static final int MAX_FAILOVER_ATTEMPTS = 5; + + /** */ + private static final int SERVERS_COUNT = 4; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(spi); + + AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi(); + failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS); + cfg.setFailoverSpi(failSpi); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + ccfg.setName(CACHE_NAME); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + + cfg.setCacheConfiguration(ccfg); + + if (gridName.equals(getTestGridName(SERVERS_COUNT))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCallRestartNode() throws Exception { + startGrids(4); + + Integer key = primaryKey(grid(0).cache(CACHE_NAME)); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + U.sleep(500); + stopGrid(0); + + return null; + } + }); + + while (!fut.isDone()) + grid(1).compute().affinityCall(CACHE_NAME, key, new CheckCallable(key)); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCallNoServerNode() throws Exception { + startGrids(SERVERS_COUNT + 1); + + final Integer key = 1; + + final Ignite client = grid(SERVERS_COUNT); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 0; i < SERVERS_COUNT; ++i) + stopGrid(i); + + return null; + } + }); + + try { + while (!fut.isDone()) + client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key)); + } + catch (ComputeTaskCancelledException e) { + assertTrue(e.getMessage().contains("stopping")); + } + catch(ClusterGroupEmptyException e) { + assertTrue(e.getMessage().contains("Topology projection is empty")); + } + catch(IgniteException e) { + assertTrue(e.getMessage().contains("cache (or node) is stopping")); + } + + stopGrid(SERVERS_COUNT); + } + + /** + * Test callable. + */ + public static class CheckCallable implements IgniteCallable { + /** Key. */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param key Key. + */ + public CheckCallable(Object key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws IgniteCheckedException { + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(CACHE_NAME, key).id()); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java index 78ecf08..a56ab9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityRoutingSelfTest.java @@ -19,17 +19,21 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import java.util.concurrent.*; + import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; @@ -47,6 +51,9 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { private static final int KEY_CNT = 50; /** */ + private static final int MAX_FAILOVER_ATTEMPTS = 5; + + /** */ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** @@ -66,6 +73,10 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(spi); + AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi(); + failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS); + cfg.setFailoverSpi(failSpi); + if (!gridName.equals(getTestGridName(GRID_CNT))) { // Default cache configuration. CacheConfiguration dfltCacheCfg = defaultCacheConfiguration(); @@ -129,6 +140,48 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testAffinityCallRestartFails() throws Exception { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key", + new FailedCallable("key", MAX_FAILOVER_ATTEMPTS + 1)); + return null; + } + }, ClusterTopologyException.class, "Failed to failover a job to another node"); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityCallRestart() throws Exception { + assertEquals(MAX_FAILOVER_ATTEMPTS, + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, "key", + new FailedCallable("key", MAX_FAILOVER_ATTEMPTS))); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityRunRestartFails() throws Exception { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", + new FailedRunnable("key", MAX_FAILOVER_ATTEMPTS + 1)); + return null; + } + }, ClusterTopologyException.class, "Failed to failover a job to another node"); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityRunRestart() throws Exception { + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, "key", new FailedRunnable("key", MAX_FAILOVER_ATTEMPTS)); + } + + /** * JUnit. * * @throws Exception If failed. @@ -224,6 +277,108 @@ public class GridCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { } /** + * Test runnable. + */ + private static class FailedCallable implements IgniteCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final String ATTR_ATTEMPT = "Attempt"; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** Key. */ + private final Object key; + + /** Call attempts. */ + private final Integer callAttempt; + + /** + * @param key Key. + * @param callAttempt Call attempts. + */ + public FailedCallable(Object key, Integer callAttempt) { + this.key = key; + this.callAttempt = callAttempt; + } + + /** {@inheritDoc} */ + @Override public Object call() throws IgniteCheckedException { + Integer attempt = jobCtx.getAttribute(ATTR_ATTEMPT); + + if (attempt == null) + attempt = 1; + + assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode()); + + jobCtx.setAttribute(ATTR_ATTEMPT, attempt + 1); + + if (attempt < callAttempt) + throw new ComputeJobFailoverException("Failover exception."); + else + return attempt; + } + } + + /** + * Test runnable. + */ + private static class FailedRunnable implements IgniteRunnable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final String ATTR_ATTEMPT = "Attempt"; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @JobContextResource + private ComputeJobContext jobCtx; + + /** Key. */ + private final Object key; + + /** Call attempts. */ + private final Integer callAttempt; + + /** + * @param key Key. + * @param callAttempt Call attempts. + */ + public FailedRunnable(Object key, Integer callAttempt) { + this.key = key; + this.callAttempt = callAttempt; + } + + /** {@inheritDoc} */ + @Override public void run() { + Integer attempt = jobCtx.getAttribute(ATTR_ATTEMPT); + + if (attempt == null) + attempt = 1; + + assertEquals(ignite.affinity(NON_DFLT_CACHE_NAME).mapKeyToNode(key), ignite.cluster().localNode()); + + jobCtx.setAttribute(ATTR_ATTEMPT, attempt + 1); + + if (attempt < callAttempt) + throw new ComputeJobFailoverException("Failover exception."); + else + assertEquals(callAttempt, attempt); + } + } + + /** * Test callable. */ private static class CheckCallable implements IgniteCallable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java index db64475..bfca83d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/failover/GridFailoverTestContext.java @@ -66,4 +66,14 @@ public class GridFailoverTestContext implements FailoverContext { @Override public ClusterNode getBalancedNode(List grid) { return grid.get(RAND.nextInt(grid.size())); } + + /** {@inheritDoc} */ + @Override public Object affinityKey() { + return null; + } + + /** {@inheritDoc} */ + @Override public String affinityCacheName() { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9d2a2ef/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 39702a3..bafdfef 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -109,6 +109,7 @@ public class IgniteCacheTestSuite extends TestSuite { // Common tests. suite.addTestSuite(GridCacheConcurrentMapSelfTest.class); suite.addTestSuite(GridCacheAffinityMapperSelfTest.class); + suite.addTestSuite(CacheAffinityCallSelfTest.class); GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityRoutingSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheMvccSelfTest.class, ignoredTests); suite.addTestSuite(GridCacheMvccPartitionedSelfTest.class);