ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3192 Leaks at the GridCacheQueryManager on iterator canceling. This closes #754.
Date Fri, 27 May 2016 12:55:04 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 013573228 -> d01a3053f


ignite-3192  Leaks at the GridCacheQueryManager on iterator canceling. This closes #754.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d01a3053
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d01a3053
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d01a3053

Branch: refs/heads/master
Commit: d01a3053ff35f03a254583a7e44220951b9b02a2
Parents: 0135732
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 27 15:54:49 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 27 15:54:49 2016 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      |  63 ++++++++-
 .../cache/GridCacheAbstractFullApiSelfTest.java | 127 ++++++++++++++++---
 2 files changed, 166 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d01a3053/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index be2a85c..430a4a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
@@ -150,7 +151,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     private volatile GridCacheQueryMetricsAdapter metrics = new GridCacheQueryMetricsAdapter();
 
     /** */
-    private final ConcurrentMap<UUID, Map<Long, GridFutureAdapter<QueryResult<K,
V>>>> qryIters =
+    private final ConcurrentMap<UUID, RequestFutureMap> qryIters =
         new ConcurrentHashMap8<>();
 
     /** */
@@ -1479,6 +1480,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                     recipient(qryInfo.senderId(), qryInfo.requestId())) :
                     queryResult(qryInfo, taskName);
 
+                if (res == null)
+                    return;
+
                 iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId()));
                 type = res.type();
 
@@ -1827,17 +1831,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
      * @return Iterator.
      * @throws IgniteCheckedException In case of error.
      */
-    private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String
taskName) throws IgniteCheckedException {
+    @Nullable private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo,
String taskName) throws IgniteCheckedException {
         assert qryInfo != null;
 
         final UUID sndId = qryInfo.senderId();
 
         assert sndId != null;
 
-        Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.get(sndId);
+        RequestFutureMap futs = qryIters.get(sndId);
 
         if (futs == null) {
-            futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>>(16,
0.75f, true) {
+            futs = new RequestFutureMap() {
                 @Override protected boolean removeEldestEntry(Map.Entry<Long, GridFutureAdapter<QueryResult<K,
V>>> e) {
                     boolean rmv = size() > maxIterCnt;
 
@@ -1854,7 +1858,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
                 }
             };
 
-            Map<Long, GridFutureAdapter<QueryResult<K, V>>> old = qryIters.putIfAbsent(sndId,
futs);
+            RequestFutureMap old = qryIters.putIfAbsent(sndId, futs);
 
             if (old != null)
                 futs = old;
@@ -1867,6 +1871,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         boolean exec = false;
 
         synchronized (futs) {
+            if (futs.isCanceled(qryInfo.requestId()))
+                return null;
+
             fut = futs.get(qryInfo.requestId());
 
             if (fut == null) {
@@ -1901,7 +1908,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
         if (sndId == null)
             return;
 
-        Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.get(sndId);
+        RequestFutureMap futs = qryIters.get(sndId);
 
         if (futs != null) {
             IgniteInternalFuture<QueryResult<K, V>> fut;
@@ -3503,4 +3510,48 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             }
         }
     }
+
+    /**
+     * The map prevents put to the map in case the specified request has been removed previously.
+     */
+    private class RequestFutureMap extends LinkedHashMap<Long, GridFutureAdapter<QueryResult<K,
V>>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Count of canceled keys */
+        private static final int CANCELED_COUNT = 128;
+
+        /**
+         * The ID of the canceled request is stored to the set in case
+         * remove(reqId) is called before put(reqId, future).
+         */
+        private Set<Long> canceled;
+
+        /** {@inheritDoc} */
+        @Override public GridFutureAdapter<QueryResult<K, V>> remove(Object key)
{
+            if (containsKey(key))
+                return super.remove(key);
+            else {
+                if (canceled == null) {
+                    canceled = Collections.newSetFromMap(
+                        new LinkedHashMap<Long, Boolean>() {
+                            @Override protected boolean removeEldestEntry(Map.Entry<Long,
Boolean> eldest) {
+                                return size() > CANCELED_COUNT;
+                            }
+                        });
+                }
+
+                canceled.add((Long)key);
+
+                return null;
+            }
+        }
+
+        /**
+         * @return true if the key is canceled
+         */
+        boolean isCanceled(Long key) {
+            return canceled != null && canceled.contains(key);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d01a3053/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 401afbf..59d3170 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -58,6 +59,8 @@ import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -67,6 +70,7 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.typedef.CIX1;
@@ -4406,6 +4410,41 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testIteratorLeakOnCancelCursor() throws Exception {
+        IgniteCache<String, Integer> cache = jcache(0);
+
+        final int SIZE = 10_000;
+
+        Map<String, Integer> putMap = new HashMap<>();
+
+        for (int i = 0; i < SIZE; ++i) {
+            String key = Integer.toString(i);
+
+            putMap.put(key, i);
+
+            if (putMap.size() == 500) {
+                cache.putAll(putMap);
+
+                info("Puts finished: " + (i + 1));
+
+                putMap.clear();
+            }
+        }
+
+        cache.putAll(putMap);
+
+        QueryCursor<Cache.Entry<String, Integer>> cur = cache.query(new ScanQuery<String,
Integer>());
+
+        cur.iterator().next();
+
+        cur.close();
+
+        waitForIteratorsCleared(cache, 10);
+    }
+
+    /**
      * If hasNext() is called repeatedly, it should return the same result.
      */
     private void checkIteratorHasNext() {
@@ -4514,6 +4553,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     }
 
     /**
+     * Checks iterators are cleared.
+     */
+    private void waitForIteratorsCleared(IgniteCache<String, Integer> cache, int secs)
throws InterruptedException {
+        for (int i = 0; i < secs; i++) {
+            try {
+                cache.size(); // Trigger weak queue poll.
+
+                checkIteratorsCleared();
+            }
+            catch (AssertionFailedError e) {
+                if (i == 9) {
+                    for (int j = 0; j < gridCount(); j++)
+                        executeOnLocalOrRemoteJvm(j, new PrintIteratorStateTask());
+
+                    throw e;
+                }
+
+                log.info("Iterators not cleared, will wait");
+
+                Thread.sleep(1000);
+            }
+        }
+    }
+
+    /**
      * Checks iterators are cleared after using.
      *
      * @param cache Cache.
@@ -4532,21 +4596,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         System.gc();
 
-        for (int i = 0; i < 10; i++) {
-            try {
-                cache.size(); // Trigger weak queue poll.
-
-                checkIteratorsCleared();
-            }
-            catch (AssertionFailedError e) {
-                if (i == 9)
-                    throw e;
-
-                log.info("Set iterators not cleared, will wait");
-
-                Thread.sleep(1000);
-            }
-        }
+        waitForIteratorsCleared(cache, 10);
     }
 
     /**
@@ -5641,10 +5691,42 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
             GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String,
Integer>internalCache().context();
             GridCacheQueryManager queries = ctx.queries();
 
-            Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
+            ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map =
GridTestUtils.getFieldValue(queries,
+                GridCacheQueryManager.class, "qryIters");
+
+            for (Map<Long, GridFutureAdapter<?>> map1 : map.values())
+                assertTrue("Iterators not removed for grid " + idx, map1.isEmpty());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class PrintIteratorStateTask extends TestIgniteIdxCallable<Void>
{
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /**
+         * @param idx Index.
+         */
+        @Override public Void call(int idx) throws Exception {
+            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String,
Integer>internalCache().context();
+            GridCacheQueryManager queries = ctx.queries();
+
+            ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map =
GridTestUtils.getFieldValue(queries,
+                GridCacheQueryManager.class, "qryIters");
 
-            for (Object obj : map.values())
-                assertEquals("Iterators not removed for grid " + idx, 0, ((Map)obj).size());
+            for (Map<Long, GridFutureAdapter<?>> map1 : map.values()) {
+                if (!map1.isEmpty()) {
+                    log.warning("Iterators leak detected at grid: " + idx);
+
+                    for (Map.Entry<Long, GridFutureAdapter<?>> entry : map1.entrySet())
+                        log.warning(entry.getKey() + "; " + entry.getValue());
+                }
+            }
 
             return null;
         }
@@ -5668,6 +5750,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      *
      */
     private static class SwapEvtsLocalListener implements IgnitePredicate<Event> {
+        /** */
         @LoggerResource
         private IgniteLogger log;
 
@@ -5705,13 +5788,21 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         }
     }
 
+    /**
+     *
+     */
     private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
+        /** */
         private final int cnt;
 
+        /**
+         * @param cnt Keys count.
+         */
         public CheckEntriesDeletedTask(int cnt) {
             this.cnt = cnt;
         }
 
+        /** {@inheritDoc} */
         @Override public void run(int idx) throws Exception {
             for (int i = 0; i < cnt; i++) {
                 String key = String.valueOf(i);


Mime
View raw message