ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [18/50] [abbrv] incubator-ignite git commit: #ignite-834: IgniteCache.clearAll() throws NPE. #ignite-732: IgniteCache.size() should not fail in case of topology changes.
Date Fri, 08 May 2015 13:32:43 GMT
#ignite-834: IgniteCache.clearAll() throws NPE.
#ignite-732: IgniteCache.size() should not fail in case of topology changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/99c7e228
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/99c7e228
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/99c7e228

Branch: refs/heads/ignite-478
Commit: 99c7e228d12e25826f74d6d8706d158ec36004ed
Parents: 9ff8029
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Wed May 6 12:30:57 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Wed May 6 12:30:57 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 511 +++++++++----------
 .../resources/META-INF/classnames.properties    |   6 +-
 2 files changed, 248 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..6674993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -21,10 +21,10 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -1083,7 +1083,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         // Clear local cache synchronously.
         clearLocally();
 
-        clearRemotes(0, new GlobalClearAllCallable(name()));
+        clearRemotes(0, null);
     }
 
     /** {@inheritDoc} */
@@ -1091,7 +1091,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         // Clear local cache synchronously.
         clearLocally(key);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        clearRemotes(0, Collections.singleton(key));
     }
 
     /** {@inheritDoc} */
@@ -1099,83 +1099,55 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         // Clear local cache synchronously.
         clearLocallyAll(keys);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+        clearRemotes(0, keys);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(K key) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key)));
+        return clearKeysAsync(Collections.singleton(key));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys)
{
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+        return clearKeysAsync(keys);
     }
 
     /**
      * @param timeout Timeout for clearLocally all task in milliseconds (0 for never).
      *      Set it to larger value for large caches.
-     * @param clearCall Global clear callable object.
+     * @param keys Keys to clear or {@code null} if all cache should be cleared.
      * @throws IgniteCheckedException In case of cache could not be cleared on any of the
nodes.
      */
-    private void clearRemotes(long timeout, GlobalClearCallable clearCall) throws IgniteCheckedException
{
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes =
-                ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
-
-            IgniteInternalFuture<Object> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
+    private void clearRemotes(long timeout, @Nullable final Set<? extends K> keys)
throws IgniteCheckedException {
+        // Send job to remote nodes only.
+        Collection<ClusterNode> nodes =
+            ctx.grid().cluster().forCacheNodes(name(), true, true, false).forRemotes().nodes();
 
-                fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
-            }
+        if (!nodes.isEmpty()) {
+            ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-            if (fut != null)
-                fut.get();
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" +
name() + "]");
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider
increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            throw e;
+            ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
         }
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        return clearAsync(new GlobalClearAllCallable(name()));
+        return clearKeysAsync(null);
     }
 
     /**
-     * @param clearCall Global clear callable object.
+     * @param keys Keys to clear or {@code null} if all cache should be cleared.
      * @return Future.
      */
-    private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+    private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> keys)
{
         Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(),
true, true, false).nodes();
 
         if (!nodes.isEmpty()) {
-            IgniteInternalFuture<Object> fut =
-                ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true);
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>()
{
-                @Override public Object applyx(IgniteInternalFuture<Object> fut) throws
IgniteCheckedException {
-                    try {
-                        return fut.get();
-                    }
-                    catch (ClusterGroupEmptyCheckedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("All remote nodes left while cache clearLocally [cacheName="
+ name() + "]");
-
-                        return null;
-                    }
-                }
-            });
+            return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
         }
         else
             return new GridFinishedFuture<>();
@@ -3562,7 +3534,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] peekModes)
{
+    @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[]
peekModes) {
         assert peekModes != null;
 
         PeekModes modes = parsePeekModes(peekModes, true);
@@ -3576,22 +3548,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null,
nodes);
-
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>,
Integer>() {
-            @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>>
fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
+        ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-                int totalSize = 0;
-
-                for (Integer size : res)
-                    totalSize += size;
-
-                return totalSize;
-            }
-        });
+        return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
     }
 
     /** {@inheritDoc} */
@@ -3909,50 +3868,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly),
null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally [cacheName=" +
name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache clear (consider
increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -4893,67 +4808,32 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     }
 
     /**
-     * Internal callable which performs clear operation on a cache with the given name.
-     */
-    @GridInternal
-    private static abstract class GlobalClearCallable implements Callable<Object>,
Externalizable {
-        /** Cache name. */
-        protected String cacheName;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         */
-        protected GlobalClearCallable(String cacheName) {
-            this.cacheName = cacheName;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            cacheName = U.readString(in);
-        }
-    }
-
-    /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends GlobalClearCallable {
+    private static class GlobalClearAllJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearAllCallable() {
+        public GlobalClearAllJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName) {
-            super(cacheName);
+        private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            ((IgniteEx)ignite).cachex(cacheName).clearLocally();
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache)
{
+            if (cache != null)
+                cache.clearLocally();
 
             return null;
         }
@@ -4963,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends GlobalClearCallable
{
+    private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4973,166 +4853,75 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearKeySetCallable() {
+        public GlobalClearKeySetJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys)
{
-            super(cacheName);
+        private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<?
extends K> keys) {
+            super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache)
{
+            if (cache != null)
+                cache.clearLocallyAll(keys);
 
             return null;
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            super.writeExternal(out);
-
-            out.writeObject(keys);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            super.readExternal(in);
-
-            keys = (Set<K>) in.readObject();
-        }
     }
 
     /**
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends IgniteClosureX<Object, Integer> implements
Externalizable {
+    private static class SizeJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Required by {@link Externalizable}.
          */
-        public SizeCallable() {
+        public SizeJob() {
             // No-op.
         }
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
-            this.cacheName = cacheName;
-            this.peekModes = peekModes;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer applyx(Object o) throws IgniteCheckedException {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-            assert cache != null : cacheName;
-
-            return cache.localSize(peekModes);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-
-            out.writeInt(peekModes.length);
+        private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[]
peekModes) {
+            super(cacheName, topVer);
 
-            for (int i = 0; i < peekModes.length; i++)
-                U.writeEnum(out, peekModes[i]);
+            this.peekModes = peekModes;
         }
 
         /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            cacheName = U.readString(in);
-
-            int len = in.readInt();
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache)
{
+            if (cache == null)
+                return 0;
 
-            peekModes = new CachePeekMode[len];
-
-            for (int i = 0; i < len; i++)
-                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+            try {
+                return cache.localSize(peekModes);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
         public String toString() {
-            return S.toString(SizeCallable.class, this);
-        }
-    }
-
-    /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or {@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
-     */
-    @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>,
Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Cache name. */
-        private String cacheName;
-
-        /** Primary only flag. */
-        private boolean primaryOnly;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalSizeCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param primaryOnly Primary only flag.
-         */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
-            this.primaryOnly = primaryOnly;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName);
-
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeBoolean(primaryOnly);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            cacheName = U.readString(in);
-            primaryOnly = in.readBoolean();
+            return S.toString(SizeJob.class, this);
         }
     }
 
@@ -5697,4 +5486,194 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
+
+    /**
+     * Delayed callable class.
+     */
+    protected static abstract class TopologyVersionAwareJob extends ComputeJobAdapter {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Injected job context. */
+        @JobContextResource
+        protected ComputeJobContext jobCtx;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Affinity topology version. */
+        protected AffinityTopologyVersion topVer;
+
+        /** Cache name. */
+        protected String cacheName;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public TopologyVersionAwareJob() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         */
+        public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer)
{
+            assert topVer != null;
+
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public final Object execute() {
+            waitAffinityReadyFuture();
+
+            IgniteInternalCache cache = ((IgniteKernal)ignite).context().cache().cache(cacheName);
+
+            return localExecute(cache);
+        }
+
+        /**
+         * @param cache Cache.
+         * @return Local execution result.
+         */
+        @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache);
+
+        /**
+         * Holds (suspends) job execution until our cache version becomes equal to remote
cache's version.
+         */
+        private void waitAffinityReadyFuture() {
+            GridCacheProcessor cacheProc = ((IgniteKernal)ignite).context().cache();
+
+            AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion();
+
+            if (locTopVer.compareTo(topVer) < 0) {
+                IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer);
+
+                if (fut != null && !fut.isDone()) {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) {
+                            jobCtx.callcc();
+                        }
+                    });
+
+                    jobCtx.holdcc();
+                }
+            }
+        }
+    }
+
+    /**
+     * Size task.
+     */
+    private static class SizeTask extends ComputeTaskAdapter<Object, Integer> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache context. */
+        private GridCacheContext ctx;
+
+        /** Peek modes. */
+        private CachePeekMode[] peekModes;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public SizeTask() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Cache context.
+         */
+        public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
+            this.ctx = ctx;
+            this.peekModes = peekModes;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(),
peekModes), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws
IgniteException {
+            int size = 0;
+
+            for (ComputeJobResult res : results) {
+                if (res.getException() == null && res != null)
+                    size += res.<Integer>getData();
+            }
+
+            return size;
+        }
+    }
+
+    /**
+     * Clear task.
+     */
+    private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object>
{
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache context. */
+        private GridCacheContext ctx;
+
+        /** Keys to clear. */
+        private Set<? extends K> keys;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public ClearTask() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Cache context.
+         * @param keys Keys to clear.
+         */
+        public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
+            this.ctx = ctx;
+            this.keys = keys;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid) {
+                jobs.put(keys == null ?
+                        new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion())
:
+                        new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(),
keys),
+                    node);
+            }
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult>
rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws
IgniteException {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/99c7e228/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..ff263cd 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -323,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure


Mime
View raw message