ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [3/5] ignite git commit: Fixed unexpected object deserialization when local store is enabled.
Date Wed, 11 May 2016 10:54:30 GMT
Fixed unexpected object deserialization when local store is enabled.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd6a67f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd6a67f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd6a67f2

Branch: refs/heads/master
Commit: bd6a67f21ef133a7a63d1f0fc3a8538ce33783f3
Parents: bc143c6
Author: dkarachentsev <dkarachentsev@gridgain.com>
Authored: Thu May 5 17:06:22 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed May 11 09:57:47 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 195 +++++++++++++++++--
 .../distributed/dht/GridDhtCacheAdapter.java    |   4 +-
 .../distributed/near/GridNearCacheAdapter.java  |   4 +-
 3 files changed, 178 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8c1a750..d81cbd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -168,6 +168,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     /** */
     public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7");
 
+    /** */
+    public static final IgniteProductVersion LOAD_CACHE_JOB_V2_SINCE = IgniteProductVersion.fromString("1.5.19");
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new
ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -3405,6 +3408,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
+        final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
         if (p != null)
             ctx.kernalContext().resource().injectGeneric(p);
 
@@ -3417,6 +3422,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
                     ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
+                    ldr.keepBinary(keepBinary);
+
                     LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc);
 
                     ctx.store().loadCache(c, args);
@@ -3527,18 +3534,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
 
-        if (replaceExisting) {
-            if (ctx.store().isLocal()) {
-                Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+        final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
-                if (nodes.isEmpty())
-                    return new GridFinishedFuture<>();
-
-                return ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new LoadKeysCallable<>(ctx.name(), keys, true, plc),
-                    nodes,
-                    true);
-            }
+        if (replaceExisting) {
+            if (ctx.store().isLocal())
+                return runLoadKeysCallable(keys, plc, keepBinary, true);
             else {
                 return ctx.closures().callLocalSafe(new Callable<Void>() {
                     @Override public Void call() throws Exception {
@@ -3549,14 +3549,41 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 });
             }
         }
-        else {
-            Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+        else
+            return runLoadKeysCallable(keys, plc, keepBinary, false);
+    }
 
-            if (nodes.isEmpty())
-                return new GridFinishedFuture<>();
+    /**
+     * Run load keys callable on appropriate nodes.
+     *
+     * @param keys Keys.
+     * @param plc Expiry policy.
+     * @param keepBinary Keep binary flag. Will be ignored for releases older than {@link
#LOAD_CACHE_JOB_V2_SINCE}.
+     * @return Operation future.
+     */
+    private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K>
keys, final ExpiryPolicy plc,
+        final boolean keepBinary, final boolean update) {
+        Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
+
+        if (nodes.isEmpty())
+            return new GridFinishedFuture<>();
+
+        Collection<ClusterNode> oldNodes = ctx.grid().cluster().forDataNodes(name()).forPredicate(
+            new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE)
< 0;
+                }
+            }).nodes();
 
+        if (oldNodes.isEmpty()) {
             return ctx.closures().callAsyncNoFailover(BROADCAST,
-                new LoadKeysCallable<>(ctx.name(), keys, false, plc),
+                new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, keepBinary),
+                nodes,
+                true);
+        }
+        else {
+            return ctx.closures().callAsyncNoFailover(BROADCAST,
+                new LoadKeysCallable<>(ctx.name(), keys, update, plc),
                 nodes,
                 true);
         }
@@ -3598,8 +3625,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * @param plc Optional expiry policy.
      * @throws IgniteCheckedException If failed.
      */
-    public void localLoad(Collection<? extends K> keys,
-        @Nullable ExpiryPolicy plc)
+    public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc,
final boolean keepBinary)
         throws IgniteCheckedException {
         final boolean replicate = ctx.isDrEnabled();
         final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
@@ -3614,6 +3640,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
             try {
                 ldr.skipStore(true);
 
+                ldr.keepBinary(keepBinary);
+
                 ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0);
@@ -3670,7 +3698,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
             .forPredicate(new IgnitePredicate<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
-                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
>= 0;
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE)
>= 0 &&
+                        node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE)
< 0;
+                }
+            });
+
+        ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE)
>= 0;
                 }
             });
 
@@ -3699,6 +3735,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
             fut.add(newNodesFut);
         }
 
+        if (!F.isEmpty(newNodesV2.nodes())) {
+            final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+            ComputeTaskInternalFuture newNodesV2Fut = ctx.kernalContext().closure().callAsync(BROADCAST,
+                Collections.singletonList(
+                    new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(),
p, args, plc, keepBinary)),
+                newNodesV2.nodes());
+
+            fut.add(newNodesV2Fut);
+        }
+
         fut.markInitialized();
 
         return fut;
@@ -5541,6 +5588,49 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
+     * Load cache job that with keepBinary flag.
+     */
+    private static class LoadCacheJobV2<K, V> extends LoadCacheJob<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final boolean keepBinary;
+
+        /**
+         * Constructor.
+         *
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param p Predicate.
+         * @param loadArgs Arguments.
+         * @param keepBinary Keep binary flag.
+         */
+        public LoadCacheJobV2(final String cacheName, final AffinityTopologyVersion topVer,
+            final IgniteBiPredicate<K, V> p, final Object[] loadArgs, final ExpiryPolicy
plc,
+            final boolean keepBinary) {
+            super(cacheName, topVer, p, loadArgs, plc);
+
+            this.keepBinary = keepBinary;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache)
{
+            assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer="
+ topVer + "]";
+
+            if (keepBinary)
+                cache = cache.keepBinary();
+
+            return super.localExecute(cache);
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(LoadCacheJobV2.class, this);
+        }
+    }
+
+    /**
      * Holder for last async operation future.
      */
     protected static class FutureHolder {
@@ -5752,7 +5842,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
          * @param cacheName Cache name.
          * @param keys Keys.
          * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
-         *        otherwise {@link #localLoad(Collection, ExpiryPolicy)}.
+         *        otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}.
          * @param plc Expiry policy.
          */
         LoadKeysCallable(String cacheName,
@@ -5767,6 +5857,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         /** {@inheritDoc} */
         @Override public Void call() throws Exception {
+            return call0(false);
+        }
+
+        /**
+         * Internal call routine.
+         *
+         * @param keepBinary Keep binary flag.
+         * @return Result (always {@code null}).
+         * @throws Exception If failed.
+         */
+        protected Void call0(boolean keepBinary) throws Exception {
             GridCacheAdapter<K, V> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             assert cache != null : cacheName;
@@ -5777,7 +5878,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 if (update)
                     cache.localLoadAndUpdate(keys);
                 else
-                    cache.localLoad(keys, plc);
+                    cache.localLoad(keys, plc, keepBinary);
             }
             finally {
                 cache.context().gate().leave();
@@ -5812,6 +5913,58 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     /**
      *
      */
+    static class LoadKeysCallableV2<K, V> extends LoadKeysCallable<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private boolean keepBinary;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public LoadKeysCallableV2() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param keys Keys.
+         * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)}
+         *        otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}.
+         * @param plc Expiry policy.
+         * @param keepBinary Keep binary flag.
+         */
+        LoadKeysCallableV2(final String cacheName, final Collection<? extends K> keys,
final boolean update,
+            final ExpiryPolicy plc, final boolean keepBinary) {
+            super(cacheName, keys, update, plc);
+
+            this.keepBinary = keepBinary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            return call0(keepBinary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(final ObjectOutput out) throws IOException {
+            super.writeExternal(out);
+
+            out.writeBoolean(keepBinary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException
{
+            super.readExternal(in);
+
+            keepBinary = in.readBoolean();
+        }
+    }
+
+    /**
+     *
+     */
     private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCacheVersion>
{
         /** */
         final IgniteBiPredicate<K, V> p;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5ff674f..9f15bf4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -442,10 +442,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy
plc)
+    @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy
plc, final boolean keepBinary)
         throws IgniteCheckedException {
         if (ctx.store().isLocal()) {
-            super.localLoad(keys, plc);
+            super.localLoad(keys, plc, keepBinary);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 7971173..dd66a33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -273,8 +273,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc)
throws IgniteCheckedException {
-        dht().localLoad(keys, plc);
+    @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc,
boolean keepBinary) throws IgniteCheckedException {
+        dht().localLoad(keys, plc, keepBinary);
     }
 
     /** {@inheritDoc} */


Mime
View raw message