ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [3/5] ignite git commit: IGNITE-2465 - Fixed race in load cache closure - Fixes #431.
Date Thu, 04 Feb 2016 00:48:29 GMT
IGNITE-2465 - Fixed race in load cache closure - Fixes #431.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/500bd3ab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/500bd3ab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/500bd3ab

Branch: refs/heads/ignite-2450
Commit: 500bd3ab576830f8160eb66274590b7684a39599
Parents: e7de923
Author: ashutak <ashutak@gridgain.com>
Authored: Wed Feb 3 14:56:42 2016 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Feb 3 14:56:42 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 100 ++++++++++++++++++-
 1 file changed, 96 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/500bd3ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 69abc54..2c3a197 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -70,6 +70,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.ComputeTaskInternalFuture;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
@@ -101,6 +102,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -128,6 +130,7 @@ import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
@@ -166,6 +169,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     /** Maximum number of retries when topology changes. */
     public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT,
100);
 
+    /** */
+    public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7");
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new
ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -3737,7 +3743,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      */
     IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K,
V> p, @Nullable Object... args)
         throws IgniteCheckedException {
-        ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name());
+        ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
< 0;
+                }
+            });
+
+        ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
>= 0;
+                }
+            });
 
         ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true);
 
@@ -3745,9 +3763,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
 
-        return ctx.kernalContext().closure().callAsync(BROADCAST,
-            Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
-            nodes.nodes());
+        GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
+
+        if (!F.isEmpty(oldNodes.nodes())) {
+            ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)),
+                oldNodes.nodes());
+
+            fut.add(oldNodesFut);
+        }
+
+        if (!F.isEmpty(newNodes.nodes())) {
+            ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Arrays.asList(new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(),
p, args, plc)),
+                newNodes.nodes());
+
+            fut.add(newNodesFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
     }
 
     /**
@@ -5498,6 +5534,62 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
+     * Internal callable for global size calculation.
+     */
+    @GridInternal
+    private static class LoadCacheJob<K, V> extends TopologyVersionAwareJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteBiPredicate<K, V> p;
+
+        /** */
+        private final Object[] args;
+
+        /** */
+        private final ExpiryPolicy plc;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param p Predicate.
+         * @param args Arguments.
+         * @param plc Policy.
+         */
+        private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K,
V> p, Object[] args,
+            ExpiryPolicy plc) {
+            super(cacheName, topVer);
+
+            this.p = p;
+            this.args = args;
+            this.plc = plc;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache)
{
+            try {
+                assert cache != null : "Failed to get a cache [cacheName=" + cacheName +
", topVer=" + topVer + "]";
+
+                if (plc != null)
+                    cache = cache.withExpiryPolicy(plc);
+
+                cache.localLoadCache(p, args);
+
+                return null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(LoadCacheJob.class, this);
+        }
+    }
+
+    /**
      * Holder for last async operation future.
      */
     protected static class FutureHolder {


Mime
View raw message