ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/50] [abbrv] ignite git commit: ignite-4552 Use for rmvQueue ConcurrentLinkedDeque instead of GridCircularBuffer to reduce memory usage. This closes #1465.
Date Wed, 15 Feb 2017 10:44:35 GMT
ignite-4552 Use for rmvQueue ConcurrentLinkedDeque instead of GridCircularBuffer to reduce
memory usage.
This closes #1465.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e6ea938d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e6ea938d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e6ea938d

Branch: refs/heads/ignite-3477-merge2.0
Commit: e6ea938d193353b173366fdd7ef132b8c2a12904
Parents: 70cd8e4
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 7 14:19:16 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 7 14:19:16 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../processors/cache/GridCacheProcessor.java    |  85 ++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLocalPartition.java  | 120 +++++++++++++----
 .../cache/CacheDeferredDeleteQueueTest.java     | 134 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 6 files changed, 322 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index c479076..703f52a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -279,6 +279,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for atomic cache queue delete history (default is 200 000 entries per
partition). */
     public static final String IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE = "IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE";
 
+    /** Ttl of removed cache entries (ms). */
+    public static final String IGNITE_CACHE_REMOVED_ENTRIES_TTL = "IGNITE_CACHE_REMOVED_ENTRIES_TTL";
+
     /**
      * Comma separated list of addresses in format "10.100.22.100:45000,10.100.22.101:45000".
      * Makes sense only for {@link org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 84126e4..87f5236 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -79,6 +79,8 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
@@ -99,6 +101,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManag
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.plugin.CachePluginManager;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -120,6 +123,7 @@ import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -846,6 +850,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
         assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility
cache should be started";
+
+        if (!ctx.clientNode() && !ctx.isDaemon())
+            addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+
+    }
+
+    /**
+     * @param timeout Cleanup timeout.
+     */
+    private void addRemovedItemsCleanupTask(long timeout) {
+        ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
     }
 
     /** {@inheritDoc} */
@@ -3913,4 +3928,74 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    private class RemovedItemsCleanupTask implements GridTimeoutObject {
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** */
+        private final long endTime;
+
+        /** */
+        private final long timeout;
+
+        /**
+         * @param timeout Timeout.
+         */
+        RemovedItemsCleanupTask(long timeout) {
+            this.timeout = timeout;
+            this.endTime = U.currentTimeMillis() + timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            ctx.closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    try {
+                        for (GridCacheContext cacheCtx : sharedCtx.cacheContexts()) {
+                            if (!cacheCtx.isLocal() && cacheCtx.affinityNode()) {
+                                GridDhtPartitionTopology top = null;
+
+                                try {
+                                    top = cacheCtx.topology();
+                                }
+                                catch (IllegalStateException ignore) {
+                                    // Cache stopped.
+                                }
+
+                                if (top != null) {
+                                    for (GridDhtLocalPartition part : top.currentLocalPartitions())
+                                        part.cleanupRemoveQueue();
+                                }
+
+                                if (ctx.isStopping())
+                                    return;
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to cleanup removed cache items: " + e, e);
+                    }
+
+                    if (ctx.isStopping())
+                        return;
+
+                    addRemovedItemsCleanupTask(timeout);
+                }
+            }, true);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 9e8788f..fb7a932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1183,14 +1183,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE,
             false);
 
-        if (part != null) {
-            try {
-                part.onDeferredDelete(entry.key(), ver);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to enqueue deleted entry [key=" + entry.key() + ", ver="
+ ver + ']', e);
-            }
-        }
+        if (part != null)
+            part.onDeferredDelete(entry.key(), ver);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 67b29ca..9f8498a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -29,7 +29,6 @@ import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -46,21 +45,20 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.util.GridCircularBuffer;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
@@ -72,8 +70,13 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  */
 public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, GridReservable,
GridCacheConcurrentMap {
     /** Maximum size for delete queue. */
-    public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
-        200_000);
+    public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE,
200_000);
+
+    /** Maximum size for {@link #rmvQueue}. */
+    private final int rmvQueueMaxSize;
+
+    /** Removed items TTL. */
+    private final long rmvdEntryTtl;
 
     /** Static logger to avoid re-creation. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -109,7 +112,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     private final ReentrantLock lock = new ReentrantLock();
 
     /** Remove queue. */
-    private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+    private final ConcurrentLinkedDeque8<RemovedEntryHolder> rmvQueue = new ConcurrentLinkedDeque8<>();
 
     /** Group reservations. */
     private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations =
new CopyOnWriteArrayList<>();
@@ -146,7 +149,9 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
         int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
             Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
 
-        rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+        rmvQueueMaxSize = U.ceilPow2(delQueueSize);
+
+        rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
     }
 
     /**
@@ -299,25 +304,43 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
     }
 
     /**
-     * @param key Removed key.
-     * @param ver Removed version.
-     * @throws IgniteCheckedException If failed.
+     *
      */
-    public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException
{
-        try {
-            T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key,
ver));
+    public void cleanupRemoveQueue() {
+        while (rmvQueue.sizex() >= rmvQueueMaxSize) {
+            RemovedEntryHolder item = rmvQueue.pollFirst();
 
-            if (evicted != null)
-                cctx.dht().removeVersionedEntry(evicted.get1(), evicted.get2());
+            if (item != null)
+                cctx.dht().removeVersionedEntry(item.key(), item.version());
         }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
 
-            throw new IgniteInterruptedCheckedException(e);
+        if (!cctx.isDrEnabled()) {
+            RemovedEntryHolder item = rmvQueue.peekFirst();
+
+            while (item != null && item.expireTime() < U.currentTimeMillis())
{
+                item = rmvQueue.pollFirst();
+
+                if (item == null)
+                    break;
+
+                cctx.dht().removeVersionedEntry(item.key(), item.version());
+
+                item = rmvQueue.peekFirst();
+            }
         }
     }
 
     /**
+     * @param key Removed key.
+     * @param ver Removed version.
+     */
+    public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) {
+        cleanupRemoveQueue();
+
+        rmvQueue.add(new RemovedEntryHolder(key, ver, rmvdEntryTtl));
+    }
+
+    /**
      * Locks partition.
      */
     @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
@@ -807,11 +830,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      *
      */
     private void clearDeferredDeletes() {
-        rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() {
-            @Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) {
-                cctx.dht().removeVersionedEntry(t.get1(), t.get2());
-            }
-        });
+        for (RemovedEntryHolder e : rmvQueue)
+            cctx.dht().removeVersionedEntry(e.key(), e.version());
     }
 
     /** {@inheritDoc} */
@@ -841,4 +861,56 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
             "empty", isEmpty(),
             "createTime", U.format(createTime));
     }
+
+    /**
+     * Removed entry holder.
+     */
+    private static class RemovedEntryHolder {
+        /** Cache key */
+        private final KeyCacheObject key;
+
+        /** Entry version */
+        private final GridCacheVersion ver;
+
+        /** Entry expire time. */
+        private final long expireTime;
+
+        /**
+         * @param key Key.
+         * @param ver Entry version.
+         * @param ttl TTL.
+         */
+        private RemovedEntryHolder(KeyCacheObject key, GridCacheVersion ver, long ttl) {
+            this.key = key;
+            this.ver = ver;
+
+            expireTime = U.currentTimeMillis() + ttl;
+        }
+
+        /**
+         * @return Key.
+         */
+        KeyCacheObject key() {
+            return key;
+        }
+
+        /**
+         * @return Version.
+         */
+        GridCacheVersion version() {
+            return ver;
+        }
+
+        /**
+         * @return item expired time
+         */
+        long expireTime() {
+            return expireTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemovedEntryHolder.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
new file mode 100644
index 0000000..b764d5b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeferredDeleteQueueTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class CacheDeferredDeleteQueueTest extends GridCommonAbstractTest {
+    /** */
+    private static String ttlProp;
+
+    /** */
+    private static int NODES = 2;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ttlProp = System.getProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL);
+
+        System.setProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL, "1000");
+
+        startGridsMultiThreaded(NODES);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        if (ttlProp != null)
+            System.setProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL, ttlProp);
+        else
+            System.clearProperty(IGNITE_CACHE_REMOVED_ENTRIES_TTL);
+
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeferredDeleteQueue() throws Exception {
+        testQueue(ATOMIC, false);
+
+        testQueue(TRANSACTIONAL, false);
+
+        testQueue(ATOMIC, true);
+
+        testQueue(TRANSACTIONAL, true);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param nearCache {@code True} if need create near cache.
+     *
+     * @throws Exception If failed.
+     */
+    private void testQueue(CacheAtomicityMode atomicityMode, boolean nearCache) throws Exception
{
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+        IgniteCache<Integer, Integer> cache = ignite(0).createCache(ccfg);
+
+        try {
+            final int KEYS = cache.getConfiguration(CacheConfiguration.class).getAffinity().partitions()
* 3;
+
+            for (int i = 0; i < KEYS; i++)
+                cache.put(i, i);
+
+            for (int i = 0; i < KEYS; i++)
+                cache.remove(i);
+
+            boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    for (int i = 0; i < NODES; i++) {
+                        final GridDhtPartitionTopology top =
+                            ((IgniteKernal)ignite(i)).context().cache().cache(null).context().topology();
+
+                        for (GridDhtLocalPartition p : top.currentLocalPartitions()) {
+                            Collection<Object> rmvQueue = GridTestUtils.getFieldValue(p,
"rmvQueue");
+
+                            if (!rmvQueue.isEmpty() || p.size() != 0)
+                                return false;
+                        }
+                    }
+
+                    return true;
+                }
+            }, 5000);
+
+            assertTrue("Failed to wait for rmvQueue cleanup.", wait);
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e6ea938d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 092d95e..3f0073d 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTes
 import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
 import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
+import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteQueueTest;
 import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
 import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTest;
 import org.apache.ignite.internal.processors.cache.CacheFutureExceptionSelfTest;
@@ -320,6 +321,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheTxPartitionedLocalStoreSelfTest.class);
         suite.addTestSuite(IgniteCacheSystemTransactionsSelfTest.class);
         suite.addTestSuite(CacheDeferredDeleteSanitySelfTest.class);
+        suite.addTestSuite(CacheDeferredDeleteQueueTest.class);
 
         suite.addTest(IgniteCacheTcpClientDiscoveryTestSuite.suite());
 


Mime
View raw message