ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [30/43] ignite git commit: ignite-3090 Implemented stmCache cleanup. (cherry picked from commit 38d9e91)
Date Thu, 19 May 2016 09:38:03 GMT
ignite-3090 Implemented stmCache cleanup.
(cherry picked from commit 38d9e91)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40fc2ec6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40fc2ec6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40fc2ec6

Branch: refs/heads/ignite-3163
Commit: 40fc2ec66abd32b8ff834103e8afbf8b27495980
Parents: 0442d2b
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 16 11:00:18 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 16 11:06:36 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  19 ++
 .../processors/query/h2/IgniteH2Indexing.java   |  59 +++++
 .../IgniteCacheQueryH2IndexingLeakTest.java     | 214 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 4 files changed, 294 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/40fc2ec6/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 1af8b6e..7d48608 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -392,6 +392,25 @@ public final class IgniteSystemProperties {
         "IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID";
 
     /**
+     * System property to specify period in milliseconds between calls of the SQL statements
cache cleanup task.
+     * <p>
+     * Cleanup tasks clears cache for terminated threads and for threads which did not perform
SQL queries within
+     * timeout configured via {@link #IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT} property.
+     * <p>
+     * Default value is {@code 10,000ms}.
+     */
+    public static final String IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD = "IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD";
+
+    /**
+     * System property to specify timeout in milliseconds after which thread's SQL statements
cache is cleared by
+     * cleanup task if the thread does not perform any query.
+     * <p>
+     * Default value is {@code 600,000ms}.
+     */
+    public static final String IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT =
+        "IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/40fc2ec6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index cae1a9f..7f0e230 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridLuceneIndex;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
 import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -153,6 +154,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getString;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
@@ -202,6 +205,16 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private static final String ESC_STR = ESC_CH + "" + ESC_CH;
 
+    /** The period of clean up the {@link #stmtCache}. */
+    private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD,
10_000);
+
+    /** The timeout to remove entry from the {@link #stmtCache} if the thread doesn't perform
any queries. */
+    private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT =
+        Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
+
+    /** */
+    private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
+
     /**
      * Command in H2 prepared statement.
      */
@@ -333,6 +346,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     cache = cache0;
             }
 
+            cache.updateLastUsage();
+
             PreparedStatement stmt = cache.get(sql);
 
             if (stmt != null && !stmt.isClosed()) {
@@ -1364,6 +1379,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}.
+     */
+    private void cleanupStatementCache() {
+        long cur = U.currentTimeMillis();
+
+        for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator();
it.hasNext(); ) {
+            Map.Entry<Thread, StatementCache> entry = it.next();
+
+            Thread t = entry.getKey();
+
+            if (t.getState() == Thread.State.TERMINATED
+                || cur - entry.getValue().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT)
+                it.remove();
+        }
+    }
+
+    /**
      * Gets space name from database schema.
      *
      * @param schemaName Schema name. Could not be null. Could be empty.
@@ -1485,6 +1517,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             mapQryExec.start(ctx, this);
             rdcQryExec.start(ctx, this);
+
+            stmtCacheCleanupTask = ctx.timeout().schedule(new Runnable() {
+                @Override public void run() {
+                    cleanupStatementCache();
+                }
+            }, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD);
         }
 
         // TODO https://issues.apache.org/jira/browse/IGNITE-2139
@@ -1573,6 +1611,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             U.error(log, "Failed to shutdown database.", e);
         }
 
+        if (stmtCacheCleanupTask != null)
+            stmtCacheCleanupTask.close();
+
         if (log.isDebugEnabled())
             log.debug("Cache query index stopped.");
     }
@@ -2507,6 +2548,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private int size;
 
+        /** Last usage. */
+        private volatile long lastUsage;
+
         /**
          * @param size Size.
          */
@@ -2528,5 +2572,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             return rmv;
         }
+
+        /**
+         * The timestamp of the last usage of the cache. Used by {@link #cleanupStatementCache()}
to remove unused caches.
+         * @return last usage timestamp
+         */
+        private long lastUsage() {
+            return lastUsage;
+        }
+
+        /**
+         * Updates the {@link #lastUsage} timestamp by current time.
+         */
+        private void updateLastUsage() {
+            lastUsage = U.currentTimeMillis();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/40fc2ec6/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
new file mode 100644
index 0000000..5d6556e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests leaks at the IgniteH2Indexing
+ */
+public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
+    /** */
+    private static final long TEST_TIMEOUT = 2 * 60 * 1000;
+
+    /** Threads to parallel execute queries */
+    private static final int THREAD_COUNT = 10;
+
+    /** Timeout */
+    private static final long STMT_CACHE_CLEANUP_TIMEOUT = 1000;
+
+    /** Orig cleanup period. */
+    private static String origCacheCleanupPeriod;
+
+    /** Orig usage timeout. */
+    private static String origCacheThreadUsageTimeout;
+
+    /** */
+    private static final int ITERATIONS = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(cacheConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    protected CacheConfiguration cacheConfiguration() {
+        CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cacheCfg.setIndexedTypes(
+            Integer.class, Integer.class
+        );
+
+        return cacheCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIMEOUT + 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        origCacheCleanupPeriod = System.getProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD);
+        origCacheThreadUsageTimeout = System.getProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT);
+
+        System.setProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, Long.toString(STMT_CACHE_CLEANUP_TIMEOUT));
+        System.setProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, Long.toString(STMT_CACHE_CLEANUP_TIMEOUT));
+
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        System.setProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD,
+            origCacheCleanupPeriod != null ? origCacheCleanupPeriod : "");
+
+        System.setProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT,
+            origCacheThreadUsageTimeout != null ? origCacheThreadUsageTimeout : "");
+    }
+
+    /**
+     * @param qryProcessor Query processor.
+     * @return size of statement cache.
+     */
+    private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
+        IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class,
"idx");
+
+        ConcurrentMap stmtCache = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class,
"stmtCache");
+
+        return stmtCache.size();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings({"TooBroadScope"})
+    public void testLeaksInIgniteH2IndexingOnTerminatedThread() throws Exception {
+        final IgniteCache<Integer, Integer> c = grid(0).cache(null);
+
+        for(int i = 0; i < ITERATIONS; ++i) {
+            info("Iteration #" + i);
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            // Open iterator on the created cursor: add entries to the cache.
+            IgniteInternalFuture<?> fut = multithreadedAsync(
+                new CAX() {
+                    @Override public void applyx() throws IgniteCheckedException {
+                        while (!stop.get()) {
+                            c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
+
+                            c.query(new SqlQuery(Integer.class, "_val >= 1")).getAll();
+                        }
+                    }
+                }, THREAD_COUNT);
+
+            final GridQueryProcessor qryProc = grid(0).context().query();
+
+            try {
+                // Wait for stmt cache entry is created for each thread.
+                assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                    @Override public boolean apply() {
+                        return getStatementCacheSize(qryProc) == THREAD_COUNT;
+                    }
+                }, STMT_CACHE_CLEANUP_TIMEOUT));
+            }
+            finally {
+                stop.set(true);
+            }
+
+            fut.get();
+
+            // Wait for stmtCache is cleaned up because all user threads are terminated.
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return getStatementCacheSize(qryProc) == 0;
+                }
+            }, STMT_CACHE_CLEANUP_TIMEOUT * 2));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLeaksInIgniteH2IndexingOnUnusedThread() throws Exception {
+        final IgniteCache<Integer, Integer> c = grid(0).cache(null);
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        for(int i = 0; i < ITERATIONS; ++i) {
+            info("Iteration #" + i);
+
+            // Open iterator on the created cursor: add entries to the cache
+            IgniteInternalFuture<?> fut = multithreadedAsync(
+                new CAX() {
+                    @Override public void applyx() throws IgniteCheckedException {
+                        c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll();
+
+                        U.await(latch);
+                    }
+                }, THREAD_COUNT);
+
+            Thread.sleep(STMT_CACHE_CLEANUP_TIMEOUT);
+
+            // Wait for stmtCache is cleaned up because all user threads don't perform queries
a lot of time.
+            assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return getStatementCacheSize(grid(0).context().query()) == 0;
+                }
+            }, STMT_CACHE_CLEANUP_TIMEOUT * 2));
+
+            latch.countDown();
+
+            fut.get();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/40fc2ec6/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 79e95cb..3c0e424 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryT
 import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapTieredMultithreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionedQueryMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryEvictsMultiThreadedSelfTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheQueryH2IndexingLeakTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryIndexSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryLoadSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreadedOffHeapTieredSelfTest;
@@ -107,6 +108,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);
         suite.addTestSuite(IgniteBinaryObjectFieldsQuerySelfTest.class);
         suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class);
+        suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class);
 
         return suite;
     }


Mime
View raw message