ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/38] incubator-ignite git commit: Ignite-54-55 Implemented basic removeAll()
Date Thu, 05 Feb 2015 17:08:50 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-57 20f93d6c8 -> ac579f808


Ignite-54-55 Implemented basic removeAll()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bcff8d8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bcff8d8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bcff8d8a

Branch: refs/heads/ignite-57
Commit: bcff8d8a87509f49ed4065ad54f68822e7eba3fa
Parents: 8970463
Author: Anton Vinogradov <avinogradov@gridgain.com>
Authored: Sun Jan 25 17:11:03 2015 +0300
Committer: Anton Vinogradov <avinogradov@gridgain.com>
Committed: Sun Jan 25 17:11:03 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheProjection.java    |  17 +++
 .../processors/cache/GridCacheAdapter.java      | 134 +++++++++++++++++--
 .../cache/GridCacheProjectionImpl.java          |   5 +
 .../processors/cache/GridCacheProxyImpl.java    |  12 ++
 .../processors/cache/IgniteCacheProxy.java      |  19 ---
 .../dht/atomic/GridDhtAtomicCache.java          |   3 +-
 .../dataload/GridDataLoadCacheUpdaters.java     |  12 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |  19 +++
 8 files changed, 183 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
index 13f389d..7dd1f3e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
@@ -1049,6 +1049,23 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K,
V>> {
     public Set<K> keySet();
 
     /**
+     * Set of keys cached on this node. You can remove elements from this set, but you cannot
add elements
+     * to this set. All removal operation will be reflected on the cache itself.
+     * <p>
+     * Iterator over this set will not fail if set was concurrently updated
+     * by another thread. This means that iterator may or may not return latest
+     * keys depending on whether they were added before or after current
+     * iterator position.
+     * <p>
+     * NOTE: this operation is not distributed and returns only the keys cached on this node.
+     *
+     * @param filter Optional filter to check prior to getting key form cache. Note
+     * that filter is checked atomically together with get operation.
+     * @return Key set for this cache projection.
+     */
+    public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>...
filter);
+
+    /**
      * Set of keys for which this node is primary.
      * This set is dynamic and may change with grid topology changes.
      * Note that this set will contain mappings for all keys, even if their values are

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3c0ed75..3cf786f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** removeAll() batch size. */
+    private static final long REMOVE_ALL_BATCH_SIZE = 100L;
+
     /** clearAll() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
@@ -3151,22 +3154,38 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
 
     /** {@inheritDoc} */
     @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>... filter)
throws IgniteCheckedException {
-        ctx.denyOnLocalRead();
+        try {
+            if (F.isEmptyOrNulls(filter))
+                filter = ctx.trueArray();
 
-        if (F.isEmptyOrNulls(filter))
-            filter = ctx.trueArray();
+            long topVer;
 
-        final IgnitePredicate<CacheEntry<K, V>>[] p = filter;
+            do {
+                topVer = ctx.affinity().affinityTopologyVersion();
 
-        syncOp(new SyncInOp(false) {
-            @Override public void inOp(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException
{
-                tx.removeAllAsync(ctx, keySet(p), null, false, null).get();
-            }
+                // Send job to all nodes.
+                Collection<ClusterNode> nodes = ctx.grid().forCache(name()).nodes();
 
-            @Override public String toString() {
-                return "removeAll [filter=" + Arrays.toString(p) + ']';
-            }
-        });
+                IgniteFuture<Object> fut = null;
+
+                if (!nodes.isEmpty())
+                    fut = ctx.closures().callAsyncNoFailover(BROADCAST, new GlobalRemoveAllCallable<>(name(),
topVer, REMOVE_ALL_BATCH_SIZE, filter), nodes, true);
+
+                if (fut != null)
+                    fut.get();
+
+            } while (ctx.affinity().affinityTopologyVersion() > topVer);
+        }
+        catch (ClusterGroupEmptyException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("All remote nodes left while cache remove [cacheName=" + name()
+ "]");
+        }
+        catch (ComputeTaskTimeoutException e) {
+            U.warn(log, "Timed out waiting for remote nodes to finish cache remove (consider
increasing " +
+                    "'networkTimeout' configuration property) [cacheName=" + name() + "]");
+
+            throw e;
+        }
     }
 
     /** {@inheritDoc} */
@@ -4952,6 +4971,97 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     }
 
     /**
+     * Internal callable which performs remove all primary key mappings
+     * operation on a cache with the given name.
+     */
+    @GridInternal
+    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>,
Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Cache name. */
+        private String cacheName;
+
+        /** Topology version. */
+        private long topVer;
+
+        /** Remove batch size. */
+        private long rmvBatchSz;
+
+        /** Filters. */
+        private IgnitePredicate<CacheEntry<K, V>>[] filter;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public GlobalRemoveAllCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         * @param rmvBatchSz Remove batch size.
+         * @param filter Filter.
+         */
+        private GlobalRemoveAllCallable(String cacheName, long topVer, long rmvBatchSz, IgnitePredicate<CacheEntry<K,
V>> ... filter) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.rmvBatchSz = rmvBatchSz;
+            this.filter = filter;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public Object call() throws Exception {
+            Set<K> keys = new HashSet<>();
+
+            final GridKernal grid = (GridKernal) ignite;
+
+            final GridCache<K,V> cache = grid.cachex(cacheName);
+
+            final GridCacheContext<K, V> ctx = grid.context().cache().<K, V>internalCache(cacheName).context();
+
+            assert cache != null;
+
+            for (K k : cache.keySet(filter)) {
+                if (ctx.affinity().primary(ctx.localNode(), k, topVer))
+                    keys.add(k);
+                if (keys.size() >= rmvBatchSz) {
+                    cache.removeAll(keys);
+
+                    keys.clear();
+                }
+            }
+            cache.removeAll(keys);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeString(out, cacheName);
+            out.writeLong(topVer);
+            out.writeLong(rmvBatchSz);
+            out.writeObject(filter);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            cacheName = U.readString(in);
+            topVer = in.readLong();
+            rmvBatchSz = in.readLong();
+            filter = (IgnitePredicate<CacheEntry<K, V>>[]) in.readObject();
+        }
+    }
+
+    /**
      * Internal callable which performs {@link org.apache.ignite.cache.CacheProjection#clearAll()}
      * operation on a cache with the given name.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 6e7ce4c..e969da6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -915,6 +915,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K,
V
     }
 
     /** {@inheritDoc} */
+    @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>...
filter) {
+        return cache.keySet(filter);
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         return cache.primaryKeySet(entryFilter(true));
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 44bdc3f..1248f98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -993,6 +993,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K,
V>, Externali
     }
 
     /** {@inheritDoc} */
+    @Override public Set<K> keySet(@Nullable IgnitePredicate<CacheEntry<K, V>>...
filter) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.keySet(filter);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 94ee239..7d4499c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -620,25 +620,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
         }
     }
 
-    /**
-     * @param keys Keys to remove.
-     */
-    public void removeAll(Collection<? extends K> keys) {
-        try {
-            GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
-
-            try {
-                delegate.removeAll(keys);
-            }
-            finally {
-                gate.leave(prev);
-            }
-        }
-        catch (IgniteCheckedException e) {
-            throw cacheException(e);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void removeAll() {
         // TODO IGNITE-1.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 683b7b9..324b1f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -513,7 +513,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
     /** {@inheritDoc} */
     @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>[] filter)
throws IgniteCheckedException {
-        removeAllAsync(filter).get();
+        super.removeAll(filter); // TODO: IGNITE-?? Fix asynс cleanup
+        //removeAllAsync(filter).get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
index 34f0b88..6562a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -100,7 +100,7 @@ public class GridDataLoadCacheUpdaters {
      * @param putMap Entries to put.
      * @throws IgniteCheckedException If failed.
      */
-    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable
Collection<K> rmvCol,
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable
Set<K> rmvCol,
         Map<K, V> putMap) throws IgniteCheckedException {
         assert rmvCol != null || putMap != null;
 
@@ -154,7 +154,7 @@ public class GridDataLoadCacheUpdaters {
             assert !F.isEmpty(entries);
 
             Map<K, V> putAll = null;
-            Collection<K> rmvAll = null;
+            Set<K> rmvAll = null;
 
             for (Map.Entry<K, V> entry : entries) {
                 K key = entry.getKey();
@@ -165,7 +165,7 @@ public class GridDataLoadCacheUpdaters {
 
                 if (val == null) {
                     if (rmvAll == null)
-                        rmvAll = new ArrayList<>();
+                        rmvAll = new HashSet<>();
 
                     rmvAll.add(key);
                 }
@@ -195,7 +195,7 @@ public class GridDataLoadCacheUpdaters {
             assert !F.isEmpty(entries);
 
             Map<K, V> putAll = null;
-            Collection<K> rmvAll = null;
+            Set<K> rmvAll = null;
 
             for (Map.Entry<K, V> entry : entries) {
                 K key = entry.getKey();
@@ -240,7 +240,7 @@ public class GridDataLoadCacheUpdaters {
             Map<Integer, Integer> partsCounts = new HashMap<>();
 
             // Group by partition ID.
-            Map<Integer, Collection<K>> rmvPartMap = null;
+            Map<Integer, Set<K>> rmvPartMap = null;
             Map<Integer, Map<K, V>> putPartMap = null;
 
             Ignite ignite = cache.unwrap(Ignite.class);
@@ -264,7 +264,7 @@ public class GridDataLoadCacheUpdaters {
                     if (rmvPartMap == null)
                         rmvPartMap = new HashMap<>();
 
-                    F.addIfAbsent(rmvPartMap, part, F.<K>newList()).add(key);
+                    F.addIfAbsent(rmvPartMap, part, F.<K>newSet()).add(key);
                 }
                 else {
                     if (putPartMap == null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bcff8d8a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index b0b0a5a..1fefec4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -2486,6 +2486,25 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         cache().removeAll();
 
         assert cache().isEmpty();
+        long entryCount = hugeRemoveAllEntryCount();
+
+        for (int i = 0; i < entryCount; i++)
+            cache().put(String.valueOf(i), i);
+
+        for (int i = 0; i < entryCount; i++)
+            assertEquals(Integer.valueOf(i), cache().get(String.valueOf(i)));
+
+        cache().removeAll();
+
+        for (int i = 0; i < entryCount; i++)
+            assertNull(cache().get(String.valueOf(i)));
+    }
+
+    /**
+     * Provides count on entities to be removed in removeAll() test
+     */
+    protected long hugeRemoveAllEntryCount(){
+        return 1000L;
     }
 
     /**


Mime
View raw message