ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [44/50] [abbrv] incubator-ignite git commit: # IGNITE-54-55 Refactoring removeAll().
Date Thu, 05 Feb 2015 16:14:39 GMT
# IGNITE-54-55 Refactoring removeAll().


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58f9a9e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58f9a9e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58f9a9e1

Branch: refs/heads/ignite-111
Commit: 58f9a9e1fe51bbefb37513fc3a2b3cb02a2e0c0b
Parents: f9fe11e
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Thu Feb 5 16:10:19 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Thu Feb 5 16:10:19 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 138 -------------------
 .../GridDistributedCacheAdapter.java            | 133 ++++++++++++++++++
 .../processors/cache/local/GridLocalCache.java  |   6 +
 3 files changed, 139 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 98bb46c..631da77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3388,35 +3388,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     }
 
     /** {@inheritDoc} */
-    @Override public void removeAll() throws IgniteCheckedException {
-        try {
-            long topVer;
-
-            do {
-                topVer = ctx.affinity().affinityTopologyVersion();
-
-                // Send job to all nodes.
-                Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes();
-
-                if (!nodes.isEmpty()) {
-                    ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get();
-                }
-            } while (ctx.affinity().affinityTopologyVersion() > topVer);
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache remove [cacheName=" + name()
+ "]");
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider
increasing " +
-                    "'networkTimeout' configuration property) [cacheName=" + name() + "]");
-
-            throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> removeAllAsync(final IgnitePredicate<CacheEntry<K,
V>>... filter) {
         ctx.denyOnLocalRead();
 
@@ -5198,115 +5169,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     }
 
     /**
-     * Internal callable which performs remove all primary key mappings
-     * operation on a cache with the given name.
-     */
-    @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>,
Externalizable {
-        /** */
-        private static final long REMOVE_ALL_BATCH_SIZE = 100L;
-
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Cache name. */
-        private String cacheName;
-
-        /** Topology version. */
-        private long topVer;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalRemoveAllCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param topVer Topology version.
-         */
-        private GlobalRemoveAllCallable(String cacheName, long topVer) {
-            this.cacheName = cacheName;
-            this.topVer = topVer;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public Object call() throws Exception {
-            Collection<K> keys = new ArrayList<>();
-
-            final IgniteKernal grid = (IgniteKernal) ignite;
-
-            final GridCache<K,V> cache = grid.cachex(cacheName);
-
-            GridCacheAdapter<K, V> cacheAdapter = grid.context().cache().internalCache(cacheName);
-
-            final GridCacheContext<K, V> ctx = cacheAdapter.context();
-
-            if (ctx.affinity().affinityTopologyVersion() != topVer)
-                return null; // Ignore this remove request because remove request will be
sent again.
-
-            if (cacheAdapter instanceof GridNearCacheAdapter)
-                cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
-
-            if (cacheAdapter instanceof GridDhtCacheAdapter) {
-                GridDhtCacheAdapter<K, V> dht = (GridDhtCacheAdapter)cacheAdapter;
-
-                for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions())
{
-                    if (locPart.primary(topVer)) {
-                        for (GridDhtCacheEntry<K, V> o : locPart.entries()) {
-                            keys.add(o.key());
-
-                            if (keys.size() >= REMOVE_ALL_BATCH_SIZE) {
-                                cache.removeAll(keys);
-
-                                keys.clear();
-                            }
-                        }
-                    }
-                }
-            }
-            else {
-                assert cache != null;
-
-                for (K k : cache.keySet()) {
-                    if (ctx.affinity().primary(ctx.localNode(), k, topVer))
-                        keys.add(k);
-
-                    if (keys.size() >= REMOVE_ALL_BATCH_SIZE) {
-                        cache.removeAll(keys);
-
-                        keys.clear();
-                    }
-                }
-            }
-
-            if (!keys.isEmpty())
-                cache.removeAll(keys);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeLong(topVer);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            cacheName = U.readString(in);
-            topVer = in.readLong();
-        }
-    }
-
-    /**
      * Internal callable which performs {@link org.apache.ignite.cache.CacheProjection#clearLocally()}
      * operation on a cache with the given name.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index d88d2e8..d6e846d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -17,18 +17,29 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridClosureCallMode.*;
 
 /**
  * Distributed cache implementation.
@@ -123,7 +134,129 @@ public abstract class GridDistributedCacheAdapter<K, V> extends
GridCacheAdapter
     }
 
     /** {@inheritDoc} */
+    @Override public void removeAll() throws IgniteCheckedException {
+        try {
+            long topVer;
+
+            do {
+                topVer = ctx.affinity().affinityTopologyVersion();
+
+                // Send job to all nodes.
+                Collection<ClusterNode> nodes = ctx.grid().forCacheNodes(name()).nodes();
+
+                if (!nodes.isEmpty()) {
+                    ctx.closures().callAsyncNoFailover(BROADCAST,
+                        new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get();
+                }
+            } while (ctx.affinity().affinityTopologyVersion() > topVer);
+        }
+        catch (ClusterGroupEmptyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("All remote nodes left while cache remove [cacheName=" + name()
+ "]");
+        }
+        catch (ComputeTaskTimeoutCheckedException e) {
+            U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider
increasing " +
+                "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+            throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDistributedCacheAdapter.class, this, "super", super.toString());
     }
+
+    /**
+     * Internal callable which performs remove all primary key mappings
+     * operation on a cache with the given name.
+     */
+    @GridInternal
+    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>,
Externalizable {
+        /** */
+        private static final long REMOVE_ALL_BATCH_SIZE = 100L;
+
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache name. */
+        private String cacheName;
+
+        /** Topology version. */
+        private long topVer;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public GlobalRemoveAllCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         */
+        private GlobalRemoveAllCallable(String cacheName, long topVer) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public Object call() throws Exception {
+            final IgniteKernal grid = (IgniteKernal) ignite;
+
+            final GridCache<K,V> cache = grid.cachex(cacheName);
+
+            GridCacheAdapter<K, V> cacheAdapter = grid.context().cache().internalCache(cacheName);
+
+            final GridCacheContext<K, V> ctx = cacheAdapter.context();
+
+            if (ctx.affinity().affinityTopologyVersion() != topVer)
+                return null; // Ignore this remove request because remove request will be
sent again.
+
+            if (cacheAdapter instanceof GridNearCacheAdapter)
+                cacheAdapter = ((GridNearCacheAdapter)cacheAdapter).dht();
+
+            GridDhtCacheAdapter<K, V> dht = (GridDhtCacheAdapter)cacheAdapter;
+
+            Collection<K> keys = new ArrayList<>();
+
+            for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions())
{
+                if (!locPart.isEmpty() && locPart.primary(topVer)) {
+                    for (GridDhtCacheEntry<K, V> o : locPart.entries()) {
+                        keys.add(o.key());
+
+                        if (keys.size() >= REMOVE_ALL_BATCH_SIZE) {
+                            cache.removeAll(keys);
+
+                            keys.clear();
+                        }
+                    }
+                }
+            }
+
+            if (!keys.isEmpty())
+                cache.removeAll(keys);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
+            out.writeLong(topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            cacheName = U.readString(in);
+            topVer = in.readLong();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58f9a9e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index c09b1cc..b4bc3a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -191,6 +191,12 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K,
V> {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void removeAll() throws IgniteCheckedException {
+        removeAll(keySet());
+    }
+
+    /** {@inheritDoc} */
     @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion
ver) {
         assert false : "Should not be called";
     }


Mime
View raw message