Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BD6B42009C6 for ; Tue, 17 May 2016 06:02:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC49C160A16; Tue, 17 May 2016 04:02:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41DB2160A1F for ; Tue, 17 May 2016 06:02:29 +0200 (CEST) Received: (qmail 43687 invoked by uid 500); 17 May 2016 04:02:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 43600 invoked by uid 99); 17 May 2016 04:02:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 May 2016 04:02:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35703E38A4; Tue, 17 May 2016 04:02:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Tue, 17 May 2016 04:02:29 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/24] ignite git commit: ignite-3090 Implemented stmCache cleanup. archived-at: Tue, 17 May 2016 04:02:30 -0000 ignite-3090 Implemented stmCache cleanup. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/38d9e91b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/38d9e91b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/38d9e91b Branch: refs/heads/master Commit: 38d9e91b0585591f714331aaf9e2da3561e37a9d Parents: db8a9b2 Author: sboikov Authored: Mon May 16 11:00:18 2016 +0300 Committer: sboikov Committed: Mon May 16 11:00:18 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 20 ++ .../processors/query/h2/IgniteH2Indexing.java | 59 +++++ .../IgniteCacheQueryH2IndexingLeakTest.java | 214 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 4 files changed, 295 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/38d9e91b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 804cd43..35d190f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -404,6 +404,26 @@ public final class IgniteSystemProperties { public static final String IGNITE_NO_SELECTOR_OPTS = "IGNITE_NO_SELECTOR_OPTS"; /** + * System property to specify period in milliseconds between calls of the SQL statements cache cleanup task. + *

+ * Cleanup tasks clears cache for terminated threads and for threads which did not perform SQL queries within + * timeout configured via {@link #IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT} property. + *

+ * Default value is {@code 10,000ms}. + */ + public static final String IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD = "IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD"; + + /** + * System property to specify timeout in milliseconds after which thread's SQL statements cache is cleared by + * cleanup task if the thread does not perform any query. + *

+ * Default value is {@code 600,000ms}. + */ + public static final String IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT = + "IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT"; + + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/38d9e91b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index caf49e8..74d3e74 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -95,6 +95,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridLuceneIndex; import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter; import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -153,6 +154,8 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_DEBUG_CONSOLE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getString; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT; import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL; @@ -202,6 +205,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private static final String ESC_STR = ESC_CH + "" + ESC_CH; + /** The period of clean up the {@link #stmtCache}. */ + private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000); + + /** The timeout to remove entry from the {@link #stmtCache} if the thread doesn't perform any queries. */ + private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT = + Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000); + + /** */ + private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask; + /** * Command in H2 prepared statement. */ @@ -333,6 +346,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { cache = cache0; } + cache.updateLastUsage(); + PreparedStatement stmt = cache.get(sql); if (stmt != null && !stmt.isClosed()) { @@ -1382,6 +1397,23 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}. + */ + private void cleanupStatementCache() { + long cur = U.currentTimeMillis(); + + for(Iterator> it = stmtCache.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + + Thread t = entry.getKey(); + + if (t.getState() == Thread.State.TERMINATED + || cur - entry.getValue().lastUsage() > STATEMENT_CACHE_THREAD_USAGE_TIMEOUT) + it.remove(); + } + } + + /** * Gets space name from database schema. * * @param schemaName Schema name. Could not be null. Could be empty. @@ -1503,6 +1535,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { mapQryExec.start(ctx, this); rdcQryExec.start(ctx, this); + + stmtCacheCleanupTask = ctx.timeout().schedule(new Runnable() { + @Override public void run() { + cleanupStatementCache(); + } + }, CLEANUP_STMT_CACHE_PERIOD, CLEANUP_STMT_CACHE_PERIOD); } // TODO https://issues.apache.org/jira/browse/IGNITE-2139 @@ -1591,6 +1629,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { U.error(log, "Failed to shutdown database.", e); } + if (stmtCacheCleanupTask != null) + stmtCacheCleanupTask.close(); + if (log.isDebugEnabled()) log.debug("Cache query index stopped."); } @@ -2533,6 +2574,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private int size; + /** Last usage. */ + private volatile long lastUsage; + /** * @param size Size. */ @@ -2554,5 +2598,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { return rmv; } + + /** + * The timestamp of the last usage of the cache. Used by {@link #cleanupStatementCache()} to remove unused caches. + * @return last usage timestamp + */ + private long lastUsage() { + return lastUsage; + } + + /** + * Updates the {@link #lastUsage} timestamp by current time. + */ + private void updateLastUsage() { + lastUsage = U.currentTimeMillis(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/38d9e91b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java new file mode 100644 index 0000000..5d6556e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.CAX; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * Tests leaks at the IgniteH2Indexing + */ +public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest { + /** */ + private static final long TEST_TIMEOUT = 2 * 60 * 1000; + + /** Threads to parallel execute queries */ + private static final int THREAD_COUNT = 10; + + /** Timeout */ + private static final long STMT_CACHE_CLEANUP_TIMEOUT = 1000; + + /** Orig cleanup period. */ + private static String origCacheCleanupPeriod; + + /** Orig usage timeout. */ + private static String origCacheThreadUsageTimeout; + + /** */ + private static final int ITERATIONS = 5; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + cacheCfg.setIndexedTypes( + Integer.class, Integer.class + ); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT + 60_000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + origCacheCleanupPeriod = System.getProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD); + origCacheThreadUsageTimeout = System.getProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT); + + System.setProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, Long.toString(STMT_CACHE_CLEANUP_TIMEOUT)); + System.setProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, Long.toString(STMT_CACHE_CLEANUP_TIMEOUT)); + + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + System.setProperty(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, + origCacheCleanupPeriod != null ? origCacheCleanupPeriod : ""); + + System.setProperty(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, + origCacheThreadUsageTimeout != null ? origCacheThreadUsageTimeout : ""); + } + + /** + * @param qryProcessor Query processor. + * @return size of statement cache. + */ + private static int getStatementCacheSize(GridQueryProcessor qryProcessor) { + IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProcessor, GridQueryProcessor.class, "idx"); + + ConcurrentMap stmtCache = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "stmtCache"); + + return stmtCache.size(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testLeaksInIgniteH2IndexingOnTerminatedThread() throws Exception { + final IgniteCache c = grid(0).cache(null); + + for(int i = 0; i < ITERATIONS; ++i) { + info("Iteration #" + i); + + final AtomicBoolean stop = new AtomicBoolean(); + + // Open iterator on the created cursor: add entries to the cache. + IgniteInternalFuture fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + while (!stop.get()) { + c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll(); + + c.query(new SqlQuery(Integer.class, "_val >= 1")).getAll(); + } + } + }, THREAD_COUNT); + + final GridQueryProcessor qryProc = grid(0).context().query(); + + try { + // Wait for stmt cache entry is created for each thread. + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return getStatementCacheSize(qryProc) == THREAD_COUNT; + } + }, STMT_CACHE_CLEANUP_TIMEOUT)); + } + finally { + stop.set(true); + } + + fut.get(); + + // Wait for stmtCache is cleaned up because all user threads are terminated. + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return getStatementCacheSize(qryProc) == 0; + } + }, STMT_CACHE_CLEANUP_TIMEOUT * 2)); + } + } + + /** + * @throws Exception If failed. + */ + public void testLeaksInIgniteH2IndexingOnUnusedThread() throws Exception { + final IgniteCache c = grid(0).cache(null); + + final CountDownLatch latch = new CountDownLatch(1); + + for(int i = 0; i < ITERATIONS; ++i) { + info("Iteration #" + i); + + // Open iterator on the created cursor: add entries to the cache + IgniteInternalFuture fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + c.query(new SqlQuery(Integer.class, "_val >= 0")).getAll(); + + U.await(latch); + } + }, THREAD_COUNT); + + Thread.sleep(STMT_CACHE_CLEANUP_TIMEOUT); + + // Wait for stmtCache is cleaned up because all user threads don't perform queries a lot of time. + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return getStatementCacheSize(grid(0).context().query()) == 0; + } + }, STMT_CACHE_CLEANUP_TIMEOUT * 2)); + + latch.countDown(); + + fut.get(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/38d9e91b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 04885ce..ebad581 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapEvictQueryT import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapTieredMultithreadedSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCachePartitionedQueryMultiThreadedSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheQueryEvictsMultiThreadedSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheQueryH2IndexingLeakTest; import org.apache.ignite.internal.processors.cache.IgniteCacheQueryIndexSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheQueryLoadSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheQueryMultiThreadedOffHeapTieredSelfTest; @@ -109,6 +110,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheQuerySerializationSelfTest.class); suite.addTestSuite(IgniteBinaryObjectFieldsQuerySelfTest.class); suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class); + suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class); return suite; }