Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1108F18269 for ; Wed, 3 Feb 2016 12:34:01 +0000 (UTC) Received: (qmail 60193 invoked by uid 500); 3 Feb 2016 12:33:27 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 60116 invoked by uid 500); 3 Feb 2016 12:33:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59984 invoked by uid 99); 3 Feb 2016 12:33:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Feb 2016 12:33:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B2D06E3831; Wed, 3 Feb 2016 12:33:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Wed, 03 Feb 2016 12:33:31 -0000 Message-Id: <9118c67c46fe4faa99bc92183ed7b404@git.apache.org> In-Reply-To: <982b9dd1b3bf477ca219fd0f9d5447ef@git.apache.org> References: <982b9dd1b3bf477ca219fd0f9d5447ef@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/7] ignite git commit: IGNITE-2465 - Fixed race in load cache closure - Fixes #431. IGNITE-2465 - Fixed race in load cache closure - Fixes #431. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f8667cbe Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f8667cbe Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f8667cbe Branch: refs/heads/ignite-1.5.7 Commit: f8667cbe294f8489d543386356a7b62c019ad07c Parents: 8a52b36 Author: ashutak Authored: Wed Feb 3 14:56:42 2016 +0300 Committer: Alexey Goncharuk Committed: Wed Feb 3 15:15:38 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 100 ++++++++++++++++++- 1 file changed, 96 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f8667cbe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 6949d90..ea4a240 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -70,6 +70,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.ComputeTaskInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -128,6 +130,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheMetricsMXBean; @@ -166,6 +169,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> stash = new ThreadLocal>() { @@ -3780,7 +3786,19 @@ public abstract class GridCacheAdapter implements IgniteInternalCache globalLoadCacheAsync(@Nullable IgniteBiPredicate p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); + ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0; + } + }); + + ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0; + } + }); ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); @@ -3788,9 +3806,27 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(ctx.name(), p, args, plc)), - nodes.nodes()); + GridCompoundFuture fut = new GridCompoundFuture<>(); + + if (!F.isEmpty(oldNodes.nodes())) { + ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), + oldNodes.nodes()); + + fut.add(oldNodesFut); + } + + if (!F.isEmpty(newNodes.nodes())) { + ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)), + newNodes.nodes()); + + fut.add(newNodesFut); + } + + fut.markInitialized(); + + return fut; } /** @@ -5572,6 +5608,62 @@ public abstract class GridCacheAdapter implements IgniteInternalCache extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteBiPredicate p; + + /** */ + private final Object[] args; + + /** */ + private final ExpiryPolicy plc; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param p Predicate. + * @param args Arguments. + * @param plc Policy. + */ + private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate p, Object[] args, + ExpiryPolicy plc) { + super(cacheName, topVer); + + this.p = p; + this.args = args; + this.plc = plc; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + try { + assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; + + if (plc != null) + cache = cache.withExpiryPolicy(plc); + + cache.localLoadCache(p, args); + + return null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(LoadCacheJob.class, this); + } + } + + /** * Holder for last async operation future. */ protected static class FutureHolder {