ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-7095: SQL: fixed per-thread H2 connection leak. This closes #3141.
Date Tue, 12 Dec 2017 13:20:03 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 1e8eaff1d -> 316feb86d


IGNITE-7095: SQL: fixed per-thread H2 connection leak. This closes #3141.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316feb86
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316feb86
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316feb86

Branch: refs/heads/master
Commit: 316feb86d4d8d2f841601b24d07eef7c049f4113
Parents: 1e8eaff
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Tue Dec 12 16:19:55 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Tue Dec 12 16:19:55 2017 +0300

----------------------------------------------------------------------
 .../processors/query/h2/IgniteH2Indexing.java   |  67 +++++++-
 .../cache/index/H2ConnectionLeaksSelfTest.java  | 169 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 3 files changed, 231 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 83eaa33..6fdcd27 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -221,6 +221,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** The period of clean up the {@link #stmtCache}. */
     private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD,
10_000);
 
+    /** The period of clean up the {@link #conns}. */
+    private final Long CLEANUP_CONNECTIONS_PERIOD = 2000L;
+
     /** The timeout to remove entry from the {@link #stmtCache} if the thread doesn't perform
any queries. */
     private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT =
         Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
@@ -228,6 +231,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
 
+    /** */
+    private GridTimeoutProcessor.CancelableTask connCleanupTask;
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -245,7 +251,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private String dbUrl = "jdbc:h2:mem:";
 
     /** */
-    private final Collection<Connection> conns = Collections.synchronizedCollection(new
ArrayList<Connection>());
+    private final ConcurrentMap<Thread, Connection> conns = new ConcurrentHashMap8<>();
 
     /** */
     private GridMapQueryExecutor mapQryExec;
@@ -304,7 +310,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl,
e);
             }
 
-            conns.add(c);
+            conns.put(Thread.currentThread(), c);
 
             return new H2ConnectionWrapper(c);
         }
@@ -485,6 +491,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private void createSchema(String schema) throws IgniteCheckedException {
         executeStatement("INFORMATION_SCHEMA", "CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema));
 
+        // This method is typically called from internal Ignite threads on bootstrap, no
need to cache this connection.
+        conns.remove(Thread.currentThread());
+
         if (log.isDebugEnabled())
             log.debug("Created H2 schema for index database: " + schema);
     }
@@ -561,7 +570,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         connCache.set(null);
 
         if (conn != null) {
-            conns.remove(conn);
+            conns.remove(Thread.currentThread());
 
             // Reset connection to receive new one at next call.
             U.close(conn, log);
@@ -782,6 +791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param tbl Table.
      * @param pk Primary key flag.
      * @param cols Columns.
+     * @param inlineSize Index inline size.
      * @return Index.
      */
     public GridH2IndexBase createSortedIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn>
cols,
@@ -949,6 +959,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      *
      * @param conn Connection,.
      * @param stmt Statement.
+     * @param timeoutMillis Query timeout.
      * @param cancel Query cancel.
      * @return Result.
      * @throws IgniteCheckedException If failed.
@@ -1021,6 +1032,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param sql Sql query.
      * @param params Parameters.
      * @param useStmtCache If {@code true} uses stmt cache.
+     * @param timeoutMillis Query timeout.
      * @param cancel Query cancel.
      * @return Result.
      * @throws IgniteCheckedException If failed.
@@ -1038,6 +1050,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param conn Connection.
      * @param sql Sql query.
      * @param params Parameters.
+     * @param timeoutMillis Query timeout.
      * @param cancel Query cancel.
      * @return Result.
      * @throws IgniteCheckedException If failed.
@@ -1172,7 +1185,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param alias Table alias.
      * @param params Query parameters.
      * @param type Query return type.
-     * @param filter Cache name and key filter.      @return Queried rows.
+     * @param filter Cache name and key filter.
+     * @param cancel Cancel object.
+     * @return Queried rows.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
@@ -1216,6 +1231,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param qry Query.
      * @param keepCacheObj Flag to keep cache object.
      * @param enforceJoinOrder Enforce join order of tables.
+     * @param timeoutMillis Query timeout.
+     * @param cancel Cancel object.
+     * @param params Query parameters.
      * @param parts Partitions.
      * @param lazy Lazy query execution flag.
      * @return Iterable result.
@@ -1680,6 +1698,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @param cacheIds Caches identifiers.
      * @throws IllegalStateException if segmented indices used with non-segmented indices.
      */
     private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
@@ -1939,6 +1958,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * Get table descriptor.
      *
      * @param schemaName Schema name.
+     * @param cacheName Cache name.
      * @param type Type name.
      * @return Descriptor.
      */
@@ -2008,6 +2028,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}.
+     */
+    private void cleanupConnections() {
+        for (Iterator<Map.Entry<Thread, Connection>> it = conns.entrySet().iterator();
it.hasNext(); ) {
+            Map.Entry<Thread, Connection> entry = it.next();
+
+            Thread t = entry.getKey();
+
+            if (t.getState() == Thread.State.TERMINATED) {
+                U.close(entry.getValue(), log);
+
+                it.remove();
+            }
+        }
+    }
+
+    /**
      * Rebuild indexes from hash index.
      *
      * @param cacheName Cache name.
@@ -2143,6 +2180,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             U.warn(log, "Custom H2 serialization is already configured, will override.");
 
         JdbcUtils.serializer = h2Serializer();
+
+        connCleanupTask = ctx.timeout().schedule(new Runnable() {
+            @Override public void run() {
+                cleanupConnections();
+            }
+        }, CLEANUP_CONNECTIONS_PERIOD, CLEANUP_CONNECTIONS_PERIOD);
     }
 
     /**
@@ -2306,7 +2349,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         mapQryExec.cancelLazyWorkers();
 
-        for (Connection c : conns)
+        for (Connection c : conns.values())
             U.close(c, log);
 
         conns.clear();
@@ -2323,6 +2366,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (stmtCacheCleanupTask != null)
             stmtCacheCleanupTask.close();
 
+        if (connCleanupTask != null)
+            connCleanupTask.close();
+
         GridH2QueryContext.clearLocalNodeStop(nodeId);
 
         if (log.isDebugEnabled())
@@ -2570,8 +2616,15 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @Override public void cancelAllQueries() {
         mapQryExec.cancelLazyWorkers();
 
-        for (Connection conn : conns)
-            U.close(conn, log);
+        for (Connection c : conns.values())
+            U.close(c, log);
+    }
+
+    /**
+     * @return Per-thread connections.
+     */
+    public Map<Thread, Connection> perThreadConnections() {
+        return conns;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
new file mode 100644
index 0000000..417c1f3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.sql.Connection;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for leaks JdbcConnection on SqlFieldsQuery execute.
+ */
+public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** Nodes count. */
+    private static final int NODE_CNT = 2;
+
+    /** Keys count. */
+    private static final int KEY_CNT = 100;
+
+    /** Threads count. */
+    private static final int THREAD_CNT = 100;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        Ignite node = startGrids(NODE_CNT);
+
+        IgniteCache<Long, String> cache = node.cache(CACHE_NAME);
+
+        for (int i = 0; i < KEY_CNT; i++)
+            cache.put((long)i, String.valueOf(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        CacheConfiguration<Long, String> ccfg = new CacheConfiguration<Long, String>().setName(CACHE_NAME)
+            .setIndexedTypes(Long.class, String.class);
+
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        if (getTestIgniteInstanceIndex(igniteInstanceName) != 0)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception On failed.
+     */
+    public void testConnectionLeaks() throws Exception {
+        final IgniteCache cache = grid(1).cache(CACHE_NAME);
+
+        final CountDownLatch latch = new CountDownLatch(THREAD_CNT);
+
+        for (int i = 0; i < THREAD_CNT; i++) {
+            new Thread() {
+                @Override public void run() {
+                    SqlFieldsQuery qry = new SqlFieldsQuery("select * from String").setLocal(false);
+
+                    cache.query(qry).getAll();
+
+                    latch.countDown();
+                }
+            }.start();
+        }
+
+        latch.await();
+
+        boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (int i = 0; i < NODE_CNT; i++) {
+                    Map<Thread, Connection> conns = perThreadConnections(i);
+
+                    if (conns.isEmpty())
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        assert res;
+    }
+
+    /**
+     * @throws Exception On failed.
+     */
+    public void testConnectionLeaksOnSqlException() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(THREAD_CNT);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+
+        for (int i = 0; i < THREAD_CNT; i++) {
+            new Thread() {
+                @Override public void run() {
+                    try {
+                        IgniteH2Indexing idx = (IgniteH2Indexing)grid(1).context().query().getIndexing();
+
+                        idx.executeStatement(CACHE_NAME, "select *");
+                    }
+                    catch (Exception e) {
+                        // No-op.
+                    }
+
+                    latch.countDown();
+
+                    try {
+                        latch2.await();
+                    }
+                    catch (InterruptedException e) {
+                        // No-op;
+                    }
+                }
+            }.start();
+        }
+
+        try {
+            latch.await();
+
+            for (int i = 0; i < NODE_CNT; i++) {
+                Map<Thread, Connection> conns = perThreadConnections(i);
+
+                assertTrue(conns.isEmpty());
+            }
+        }
+        finally {
+            latch2.countDown();
+        }
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @return Per-thread connections.
+     */
+    private Map<Thread, Connection> perThreadConnections(int nodeIdx) {
+        return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).perThreadConnections();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 7b3b271..4d8016b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -119,6 +119,7 @@ import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComple
 import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalPartitionedTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalReplicatedTest;
 import org.apache.ignite.internal.processors.cache.index.H2DynamicTableSelfTest;
+import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest;
 import org.apache.ignite.internal.processors.cache.index.LongIndexNameTest;
 import org.apache.ignite.internal.processors.cache.index.SchemaExchangeSelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest;
@@ -366,6 +367,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(LongIndexNameTest.class);
         suite.addTestSuite(GridCacheQuerySqlFieldInlineSizeSelfTest.class);
         suite.addTestSuite(IgniteSqlParameterizedQueryTest.class);
+        suite.addTestSuite(H2ConnectionLeaksSelfTest.class);
 
         suite.addTestSuite(IgniteCheckClusterStateBeforeExecuteQueryTest.class);
 


Mime
View raw message