Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EFCD018693 for ; Thu, 4 Feb 2016 06:19:51 +0000 (UTC) Received: (qmail 17552 invoked by uid 500); 4 Feb 2016 06:19:51 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 17411 invoked by uid 500); 4 Feb 2016 06:19:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 17356 invoked by uid 99); 4 Feb 2016 06:19:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 04 Feb 2016 06:19:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73CFCE0A3A; Thu, 4 Feb 2016 06:19:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 04 Feb 2016 06:19:53 -0000 Message-Id: <4e9591954a484da7b7787405d2712c07@git.apache.org> In-Reply-To: <21ead093291040de94f2007a5a019186@git.apache.org> References: <21ead093291040de94f2007a5a019186@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] ignite git commit: IGNITE-2465 - Fixed race in load cache closure - Fixes #431. IGNITE-2465 - Fixed race in load cache closure - Fixes #431. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/500bd3ab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/500bd3ab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/500bd3ab Branch: refs/heads/ignite-2541 Commit: 500bd3ab576830f8160eb66274590b7684a39599 Parents: e7de923 Author: ashutak Authored: Wed Feb 3 14:56:42 2016 +0300 Committer: Alexey Goncharuk Committed: Wed Feb 3 14:56:42 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 100 ++++++++++++++++++- 1 file changed, 96 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/500bd3ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 69abc54..2c3a197 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -70,6 +70,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.ComputeTaskInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -128,6 +130,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheMetricsMXBean; @@ -166,6 +169,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> stash = new ThreadLocal>() { @@ -3737,7 +3743,19 @@ public abstract class GridCacheAdapter implements IgniteInternalCache globalLoadCacheAsync(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); + ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0; + } + }); + + ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0; + } + }); ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); @@ -3745,9 +3763,27 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(ctx.name(), p, args, plc)), - nodes.nodes()); + GridCompoundFuture fut = new GridCompoundFuture<>(); + + if (!F.isEmpty(oldNodes.nodes())) { + ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), + oldNodes.nodes()); + + fut.add(oldNodesFut); + } + + if (!F.isEmpty(newNodes.nodes())) { + ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)), + newNodes.nodes()); + + fut.add(newNodesFut); + } + + fut.markInitialized(); + + return fut; } /** @@ -5498,6 +5534,62 @@ public abstract class GridCacheAdapter implements IgniteInternalCache extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteBiPredicate p; + + /** */ + private final Object[] args; + + /** */ + private final ExpiryPolicy plc; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param p Predicate. + * @param args Arguments. + * @param plc Policy. + */ + private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate p, Object[] args, + ExpiryPolicy plc) { + super(cacheName, topVer); + + this.p = p; + this.args = args; + this.plc = plc; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + try { + assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; + + if (plc != null) + cache = cache.withExpiryPolicy(plc); + + cache.localLoadCache(p, args); + + return null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(LoadCacheJob.class, this); + } + } + + /** * Holder for last async operation future. */ protected static class FutureHolder {