ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amashen...@apache.org
Subject [ignite] branch master updated: IGNITE-12804 SQL: ConnectionManager refactoring (#7569)
Date Fri, 03 Apr 2020 08:55:56 GMT
This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5175780  IGNITE-12804  SQL: ConnectionManager refactoring (#7569)
5175780 is described below

commit 51757809bbc8719f3ab9ee4a635f7e1054c11aa4
Author: Taras Ledkov <tledkov@gridgain.com>
AuthorDate: Fri Apr 3 11:55:41 2020 +0300

    IGNITE-12804  SQL: ConnectionManager refactoring (#7569)
---
 .../query/h2/opt/GridH2SpatialIndex.java           |   8 +-
 .../processors/query/h2/ConcurrentStripedPool.java | 177 +++++++++
 .../processors/query/h2/ConnectionManager.java     | 426 ++++++--------------
 .../processors/query/h2/H2CachedStatementKey.java  |  27 +-
 .../internal/processors/query/h2/H2Connection.java | 207 ++++++++++
 .../processors/query/h2/H2ConnectionWrapper.java   | 136 -------
 .../processors/query/h2/H2FieldsIterator.java      |  21 +-
 .../processors/query/h2/H2PooledConnection.java    | 129 ++++++
 .../processors/query/h2/H2ResultSetIterator.java   |  39 +-
 .../processors/query/h2/H2StatementCache.java      |  25 +-
 .../internal/processors/query/h2/H2Utils.java      | 229 ++++++++++-
 .../processors/query/h2/IgniteH2Indexing.java      |  84 ++--
 .../internal/processors/query/h2/QueryParser.java  | 434 ++++++++++-----------
 .../processors/query/h2/SchemaManager.java         |  52 ++-
 .../query/h2/database/H2PkHashIndex.java           |   3 +-
 .../processors/query/h2/database/H2TreeIndex.java  |  35 +-
 .../processors/query/h2/dml/UpdatePlan.java        |  48 +--
 .../processors/query/h2/dml/UpdatePlanBuilder.java |  13 +-
 .../processors/query/h2/opt/GridH2IndexBase.java   |   5 +-
 .../processors/query/h2/opt/GridH2Table.java       |   8 +-
 .../processors/query/h2/opt/QueryContext.java      |  46 +++
 .../query/h2/opt/QueryContextRegistry.java         |  34 +-
 .../query/h2/opt/join/DistributedLookupBatch.java  |  29 +-
 .../query/h2/sql/GridSqlQuerySplitter.java         |  25 +-
 .../query/h2/twostep/GridMapQueryExecutor.java     |  64 +--
 .../query/h2/twostep/GridReduceQueryExecutor.java  | 261 ++++++-------
 .../query/h2/twostep/MapQueryResult.java           |  23 +-
 .../query/h2/twostep/MapQueryResults.java          |   9 -
 .../query/h2/twostep/ReduceQueryRun.java           |  15 -
 .../cache/IgniteCacheQueryH2IndexingLeakTest.java  |  14 +-
 .../cache/index/H2ConnectionLeaksSelfTest.java     |  54 ++-
 ...actQueryTableLockAndConnectionPoolSelfTest.java | 123 +++++-
 .../IgniteSqlSkipReducerOnUpdateDmlSelfTest.java   |   2 +-
 .../query/IgniteSqlSplitterSelfTest.java           |   3 +-
 .../processors/query/h2/QueryDataPageScanTest.java |   7 +-
 .../query/h2/sql/GridQueryParsingTest.java         |  60 +--
 36 files changed, 1615 insertions(+), 1260 deletions(-)

diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index fcc7f96..e2d8610 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -320,7 +320,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
         try {
             checkClosed();
 
-            final int seg = threadLocalSegment();
+            final int seg = segment(H2Utils.context(filter.getSession()));
 
             final MVRTreeMap<Long> segment = segments[seg];
 
@@ -350,7 +350,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
 
         IndexingQueryFilter qryFilter = null;
 
-        QueryContext qctx = queryContextRegistry().getThreadLocal();
+        QueryContext qctx = H2Utils.context(filter.getSession());
 
         if (qctx != null)
             qryFilter = qctx.filter();
@@ -387,7 +387,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
             if (!first)
                 throw DbException.throwInternalError("Spatial Index can only be fetch by ascending order");
 
-            final int seg = threadLocalSegment();
+            final int seg = segment(H2Utils.context(ses));
 
             final MVRTreeMap<Long> segment = segments[seg];
 
@@ -429,7 +429,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
             if (intersection == null)
                 return find(filter.getSession(), null, null);
 
-            final int seg = threadLocalSegment();
+            final int seg = segment(H2Utils.context(filter.getSession()));
 
             final MVRTreeMap<Long> segment = segments[seg];
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConcurrentStripedPool.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConcurrentStripedPool.java
new file mode 100644
index 0000000..fb91520
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConcurrentStripedPool.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Concurrent pool of object based on ConcurrentLinkedDeque.
+ */
+public class ConcurrentStripedPool<E> implements Iterable<E> {
+    /** Stripe pools. */
+    private final ConcurrentLinkedQueue<E>[] stripePools;
+
+    /** Stripes count. */
+    private final int stripes;
+
+    /** Stripe pools size (calculates fast, optimistic and approximate). */
+    private AtomicInteger[] stripeSize;
+
+    /** Max pool size. */
+    private int maxPoolSize;
+
+    /**
+     * Constructor.
+     *
+     * @param stripes Count of stripes.
+     * @param maxPoolSize Max pool size.
+     */
+    @SuppressWarnings("unchecked")
+    public ConcurrentStripedPool(int stripes, int maxPoolSize) {
+        this.stripes = stripes;
+        this.maxPoolSize = maxPoolSize;
+
+        stripePools = new ConcurrentLinkedQueue[stripes];
+        stripeSize = new AtomicInteger[stripes];
+
+        for (int i = 0; i < stripes; ++i) {
+            stripePools[i] = new ConcurrentLinkedQueue<>();
+            stripeSize[i] = new AtomicInteger();
+        }
+    }
+
+    /**
+     * Pushes an element onto the pool.
+     *
+     * @param e the element to push
+     * @throws NullPointerException if the specified element is null and this deque does not permit null elements
+     * @return {@code true} if the element is returned to the pool, {@code false} if the is no space at the pool.
+     */
+    public boolean recycle(E e) {
+        int idx = (int)(Thread.currentThread().getId() % stripes);
+
+        if (stripeSize[idx].get() > maxPoolSize)
+            return false;
+
+        stripePools[idx].add(e);
+
+        stripeSize[idx].incrementAndGet();
+
+        return true;
+    }
+
+    /**
+     * Retrieves element from pool, or returns {@code null} if the pool is empty.
+     *
+     * @return the  element of the pool, or {@code null} if the pool is empty.
+     */
+    public E borrow() {
+        int idx = (int)(Thread.currentThread().getId() % stripes);
+
+        E r = stripePools[idx].poll();
+
+        if (r != null)
+            stripeSize[idx].decrementAndGet();
+
+        return r;
+    }
+
+    /**
+     * Performs the given action for each element of the pool until all elements have been processed or the action
+     * throws an exception. Exceptions thrown by the action are relayed to the caller.
+     *
+     * @param action The action to be performed for each element
+     * @throws NullPointerException if the specified action is null
+     */
+    @Override public void forEach(Consumer<? super E> action) {
+        Objects.requireNonNull(action);
+
+        for (int i = 0; i < stripes; ++i)
+            stripePools[i].forEach(action);
+    }
+
+    /**
+     * Removes all of the elements from the pool..
+     */
+    public void clear() {
+        for (int i = 0; i < stripes; ++i)
+            stripePools[i].clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Iterator<E> iterator() {
+        return new Iterator<E>() {
+            int idx = 0;
+            Iterator<E> it = stripePools[idx].iterator();
+
+            @Override public boolean hasNext() {
+                if (it.hasNext())
+                    return true;
+
+                idx++;
+
+                if (idx < stripes) {
+                    it = stripePools[idx].iterator();
+
+                    return it.hasNext();
+                }
+                else
+                    return false;
+            }
+
+            @Override public E next() {
+                if (it.hasNext())
+                    return it.next();
+
+                idx++;
+
+                if (idx < stripes) {
+                    it = stripePools[idx].iterator();
+
+                    return it.next();
+                }
+                else
+                    throw new NoSuchElementException();
+            }
+        };
+    }
+
+    /**
+     * Returns a sequential {@code Stream} of the pool.
+     *
+     * @return a sequential {@code Stream} over the elements iof the pool.
+     */
+    public Stream<E> stream() {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    /**
+     * @param size New max pool size.
+     */
+    public void resize(int size) {
+        maxPoolSize = size;
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
index 05e37a0..cf4a5f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ConnectionManager.java
@@ -19,50 +19,46 @@ package org.apache.ignite.internal.processors.query.h2;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Map;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRowFactory;
-import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.h2.jdbc.JdbcStatement;
-import org.jetbrains.annotations.Nullable;
+import org.h2.api.JavaObjectSerializer;
+import org.h2.engine.Database;
+import org.h2.jdbc.JdbcConnection;
+import org.h2.store.DataHandler;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT;
+import static org.apache.ignite.internal.processors.query.h2.H2Utils.setter;
 
 /**
  * H2 connection manager.
  */
 public class ConnectionManager {
     /** Default DB options. */
-    private static final String DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
+    private static final String DEFAULT_DB_OPTIONS = ";LOCK_MODE=3;MULTI_THREADED=1;DB_CLOSE_ON_EXIT=FALSE" +
         ";DEFAULT_LOCK_TIMEOUT=10000;FUNCTIONS_IN_SCHEMA=true;OPTIMIZE_REUSE_RESULTS=0;QUERY_CACHE_SIZE=0" +
         ";MAX_OPERATION_MEMORY=0;BATCH_JOINS=1" +
         ";ROW_FACTORY=\"" + H2PlainRowFactory.class.getName() + "\"" +
         ";DEFAULT_TABLE_ENGINE=" + GridH2DefaultTableEngine.class.getName();
 
-    /** The period of clean up the {@link #threadConns}. */
-    private static final Long CONN_CLEANUP_PERIOD = 2000L;
+    /** Default maximum size of connection pool. */
+    private static final int DFLT_CONNECTION_POOL_SIZE = 32;
 
-    /** The period of clean up the statement cache. */
-    @SuppressWarnings("FieldCanBeLocal")
-    private final Long stmtCleanupPeriod = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
-
-    /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
-    private final Long stmtTimeout = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
+    /** */
+    private static final H2Utils.Setter<Database, JavaObjectSerializer> DB_JOBJ_SERIALIZER
+        = setter(Database.class, "javaObjectSerializer");
 
     /*
      * Initialize system properties for H2.
@@ -75,67 +71,33 @@ public class ConnectionManager {
         System.setProperty("h2.dropRestrict", "false"); // Drop schema with cascade semantics.
     }
 
-    /** Shared connection pool. */
-    private final ThreadLocalObjectPool<H2ConnectionWrapper> connPool =
-        new ThreadLocalObjectPool<>(
-            5,
-            this::newConnectionWrapper,
-            this::closeDetachedConnection,
-            this::addConnectionToThreaded);
-
-    /** Per-thread connections. */
-    private final ConcurrentMap<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> threadConns = new ConcurrentHashMap<>();
-
-    /** Track detached connections to close on node stop. */
-    private final ConcurrentMap<H2ConnectionWrapper, Boolean> detachedConns = new ConcurrentHashMap<>();
-
-    /** Connection cache. */
-    private final ThreadLocal<ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable> threadConn =
-        new ThreadLocal<ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable>() {
-        @Override public ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable get() {
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable reusable = super.get();
-
-            boolean reconnect = true;
-
-            try {
-                reconnect = reusable == null || reusable.object().connection().isClosed();
-            }
-            catch (SQLException e) {
-                U.warn(log, "Failed to check connection status.", e);
-            }
-
-            if (reconnect) {
-                reusable = initialValue();
-
-                set(reusable);
-            }
-
-            return reusable;
-        }
-
-        @Override protected ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable initialValue() {
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable reusableConnection = connPool.borrow();
-
-            addConnectionToThreaded(reusableConnection.object());
+    /** The period of clean up the statement cache. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private final Long stmtCleanupPeriod = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000);
 
-            return reusableConnection;
-        }
-    };
+    /** The timeout to remove entry from the statement cache if the thread doesn't perform any queries. */
+    private final Long stmtTimeout = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000);
 
     /** Database URL. */
     private final String dbUrl;
 
-    /** Connection cleanup task. */
-    private final GridTimeoutProcessor.CancelableTask connCleanupTask;
-
     /** Statement cleanup task. */
     private final GridTimeoutProcessor.CancelableTask stmtCleanupTask;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Used connections set. */
+    private final Set<H2Connection> usedConns = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    /** Connection pool. */
+    private final ConcurrentStripedPool<H2Connection> connPool;
+
     /** H2 connection for INFORMATION_SCHEMA. Holds H2 open until node is stopped. */
     private volatile Connection sysConn;
 
-    /** Logger. */
-    private final IgniteLogger log;
+    /** H2 data handler. Primarily used for serialization. */
+    private final DataHandler dataNhd;
 
     /**
      * Constructor.
@@ -143,86 +105,30 @@ public class ConnectionManager {
      * @param ctx Context.
      */
     public ConnectionManager(GridKernalContext ctx) {
-        dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DB_OPTIONS;
+        connPool = new ConcurrentStripedPool<>(ctx.config().getQueryThreadPoolSize(), DFLT_CONNECTION_POOL_SIZE);
+
+        dbUrl = "jdbc:h2:mem:" + ctx.localNodeId() + DEFAULT_DB_OPTIONS;
 
         log = ctx.log(ConnectionManager.class);
 
         org.h2.Driver.load();
 
-        sysConn = connectionNoCache(QueryUtils.SCHEMA_INFORMATION);
-
-        stmtCleanupTask = ctx.timeout().schedule(this::cleanupStatements, stmtCleanupPeriod, stmtCleanupPeriod);
-        connCleanupTask = ctx.timeout().schedule(this::cleanupConnections, CONN_CLEANUP_PERIOD, CONN_CLEANUP_PERIOD);
-    }
-
-    /**
-     * @return H2 connection wrapper.
-     */
-    public H2ConnectionWrapper connectionForThread() {
-        return threadConn.get().object();
-    }
-
-    /**
-     * @return Per-thread connections (for testing purposes only).
-     */
-    public Map<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> connectionsForThread() {
-        return threadConns;
-    }
-
-    /**
-     * Removes from cache and returns associated with current thread connection.
-     *
-     * @return Connection associated with current thread.
-     */
-    public ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachThreadConnection() {
-        Thread key = Thread.currentThread();
-
-        ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable reusableConn = threadConn.get();
-
-        ConcurrentMap<H2ConnectionWrapper, Boolean> connSet = threadConns.get(key);
-
-        assert connSet != null;
-
-        Boolean rmv = connSet.remove(reusableConn.object());
-
-        assert rmv != null;
-
-        threadConn.remove();
-
-        detachedConns.putIfAbsent(reusableConn.object(), false);
+        try {
+            sysConn = DriverManager.getConnection(dbUrl);
 
-        return reusableConn;
-    }
+            sysConn.setSchema(QueryUtils.SCHEMA_INFORMATION);
 
-    /**
-     * Get connection without cache.
-     *
-     * @param schema Schema name.
-     * @return Connection.
-     * @throws IgniteSQLException On error.
-     */
-    public Connection connectionNoCache(String schema) throws IgniteSQLException {
-        try {
-            Connection conn = DriverManager.getConnection(dbUrl);
+            assert sysConn instanceof JdbcConnection : sysConn;
 
-            conn.setSchema(schema);
+            JdbcConnection conn = (JdbcConnection)sysConn;
 
-            return conn;
+            dataNhd = conn.getSession().getDataHandler();
         }
         catch (SQLException e) {
-            throw new IgniteSQLException("Failed to initialize system DB connection: " + dbUrl, e);
+            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
         }
-    }
 
-    /**
-     * @return {@link H2StatementCache} associated with current thread.
-     */
-    public H2StatementCache statementCacheForThread() {
-        H2StatementCache statementCache = threadConn.get().object().statementCache();
-
-        statementCache.updateLastUsage();
-
-        return statementCache;
+        stmtCleanupTask = ctx.timeout().schedule(this::cleanupStatements, stmtCleanupPeriod, stmtCleanupPeriod);
     }
 
     /**
@@ -233,25 +139,16 @@ public class ConnectionManager {
      * @throws IgniteCheckedException If failed.
      */
     public void executeStatement(String schema, String sql) throws IgniteCheckedException {
-        Statement stmt = null;
-
-        Connection c = null;
+        try (H2PooledConnection conn = connection(schema)) {
+            Connection c = conn.connection();
 
-        try {
-            c = connectionForThread().connection(schema);
-
-            stmt = c.createStatement();
-
-            stmt.executeUpdate(sql);
+            try (Statement stmt = c.createStatement()) {
+                stmt.executeUpdate(sql);
+            }
         }
         catch (SQLException e) {
-            onSqlException(c);
-
             throw new IgniteCheckedException("Failed to execute statement: " + sql, e);
         }
-        finally {
-            U.close(stmt, log);
-        }
     }
 
     /**
@@ -279,106 +176,21 @@ public class ConnectionManager {
     }
 
     /**
-     * Get cached prepared statement (if any).
-     *
-     * @param c Connection.
-     * @param sql SQL.
-     * @return Prepared statement or {@code null}.
-     * @throws SQLException On error.
-     */
-    @Nullable public PreparedStatement cachedPreparedStatement(Connection c, String sql) throws SQLException {
-        H2StatementCache cache = statementCacheForThread();
-
-        H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(), sql);
-
-        PreparedStatement stmt = cache.get(key);
-
-        // Nothing found.
-        if (stmt == null)
-            return null;
-
-        // TODO: Remove thread local caching at all. Just keep per-connection statement cache.
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-11211
-        // Statement is not from the given connection.
-        if (stmt.getConnection() != c)
-            return null;
-
-        // Is statement still valid?
-        if (
-            stmt.isClosed() ||                                 // Closed.
-            stmt.unwrap(JdbcStatement.class).isCancelled() ||  // Cancelled.
-            GridSqlQueryParser.prepared(stmt).needRecompile() // Outdated (schema has been changed concurrently).
-        )
-            return null;
-
-        return stmt;
-    }
-
-    /**
-     * Prepare statement caching it if needed.
-     *
-     * @param c Connection.
-     * @param sql SQL.
-     * @return Prepared statement.
-     * @throws SQLException If failed.
-     */
-    public PreparedStatement prepareStatement(Connection c, String sql) throws SQLException {
-        PreparedStatement stmt = cachedPreparedStatement(c, sql);
-
-        if (stmt == null) {
-            H2StatementCache cache = statementCacheForThread();
-
-            H2CachedStatementKey key = new H2CachedStatementKey(c.getSchema(), sql);
-
-            stmt = prepareStatementNoCache(c, sql);
-
-            cache.put(key, stmt);
-        }
-
-        return stmt;
-    }
-
-    /**
-     * Get prepared statement without caching.
-     *
-     * @param c Connection.
-     * @param sql SQL.
-     * @return Prepared statement.
-     * @throws SQLException If failed.
-     */
-    public PreparedStatement prepareStatementNoCache(Connection c, String sql) throws SQLException {
-        boolean insertHack = GridH2Table.insertHackRequired(sql);
-
-        if (insertHack) {
-            GridH2Table.insertHack(true);
-
-            try {
-                return c.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-            }
-            finally {
-                GridH2Table.insertHack(false);
-            }
-        }
-        else
-            return c.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-    }
-
-    /**
      * Clear statement cache when cache is unregistered..
      */
     public void onCacheDestroyed() {
-        threadConns.values().forEach(set -> set.keySet().forEach(H2ConnectionWrapper::clearStatementCache));
+        connPool.forEach(H2Connection::clearStatementCache);
     }
 
     /**
      * Close all connections.
      */
     private void closeConnections() {
-        threadConns.values().forEach(set -> set.keySet().forEach(U::closeQuiet));
-        detachedConns.keySet().forEach(U::closeQuiet);
+        connPool.forEach(c -> U.close(c.connection(), log));
+        connPool.clear();
 
-        threadConns.clear();
-        detachedConns.clear();
+        usedConns.forEach(c -> U.close(c.connection(), log));
+        usedConns.clear();
     }
 
     /**
@@ -395,117 +207,127 @@ public class ConnectionManager {
         if (stmtCleanupTask != null)
             stmtCleanupTask.close();
 
-        if (connCleanupTask != null)
-            connCleanupTask.close();
-
         // Needs to be released before SHUTDOWN.
         closeConnections();
 
-        try (Connection c = connectionNoCache(QueryUtils.SCHEMA_INFORMATION); Statement s = c.createStatement()) {
+        try (Statement s = sysConn.createStatement()) {
             s.execute("SHUTDOWN");
         }
         catch (SQLException e) {
             U.error(log, "Failed to shutdown database.", e);
         }
 
-        if (sysConn != null) {
-            U.close(sysConn, log);
+        U.close(sysConn, log);
+    }
 
-            sysConn = null;
-        }
+    /**
+     * Called periodically to clean up the statement cache.
+     */
+    private void cleanupStatements() {
+        long now = U.currentTimeMillis();
+
+        connPool.forEach(c -> {
+            if (now - c.statementCache().lastUsage() > stmtTimeout)
+                c.clearStatementCache();
+        });
     }
 
     /**
-     * Handles SQL exception.
-     * @param c Connection to close.
+     * @param schema Schema name.
+     * @return Connection with setup schema.
      */
-    public void onSqlException(Connection c) {
-        H2ConnectionWrapper conn = threadConn.get().object();
+    public H2PooledConnection connection(String schema) {
+        H2PooledConnection conn = connection();
 
-        // Clear thread local cache if connection not detached.
-        if (conn.connection() == c)
-            threadConn.remove();
+        try {
+            conn.schema(schema);
 
-        if (c != null) {
-            threadConns.remove(Thread.currentThread());
+            return conn;
+        }
+        catch (IgniteSQLException e) {
+            U.closeQuiet(conn);
 
-            // Reset connection to receive new one at next call.
-            U.close(c, log);
+            throw e;
         }
     }
 
     /**
-     * Create new connection wrapper.
+     * Resize the connection pool.
      *
-     * @return Connection wrapper.
+     * @param size New size the connection pool.
      */
-    private H2ConnectionWrapper newConnectionWrapper() {
-        try {
-            return new H2ConnectionWrapper(DriverManager.getConnection(dbUrl));
-        }
-        catch (SQLException e) {
-            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
-        }
+    void poolSize(int size) {
+        if (size <= 0)
+            throw new IllegalArgumentException("Invalid connection pool size: " + size);
+
+        connPool.resize(size);
     }
 
     /**
-     * Called by connection bool on connection recycle.
-     *
-     * @param conn recycled connection.
+     * @return H2 connection wrapper.
      */
-    private void addConnectionToThreaded(H2ConnectionWrapper conn) {
-        Thread cur = Thread.currentThread();
+    public H2PooledConnection connection() {
+        try {
+            H2Connection conn = connPool.borrow();
 
-        ConcurrentMap<H2ConnectionWrapper, Boolean> setConn = threadConns.get(cur);
+            if (conn == null)
+                conn = newConnection();
 
-        if (setConn == null) {
-            setConn = new ConcurrentHashMap<>();
+            H2PooledConnection connWrp = new H2PooledConnection(conn, this);
 
-            threadConns.putIfAbsent(cur, setConn);
-        }
+            usedConns.add(conn);
 
-        setConn.put(conn, false);
+            assert !conn.connection().isClosed() : "Connection is closed [conn=" + conn + ']';
 
-        detachedConns.remove(conn);
+            return connWrp;
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
+        }
     }
 
     /**
-     * Called by connection bool on connection close.
+     * Create new connection wrapper.
      *
-     * @param conn closed connection.
+     * @return Connection wrapper.
      */
-    private void closeDetachedConnection(H2ConnectionWrapper conn) {
-        U.close(conn, log);
-
-        detachedConns.remove(conn);
+    private H2Connection newConnection() {
+        try {
+            return new H2Connection(DriverManager.getConnection(dbUrl), log);
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e);
+        }
     }
 
     /**
-     * Called periodically to cleanup connections.
+     * Return connection to pool or close if the pool size is bigger then maximum.
+     *
+     * @param conn Connection.
      */
-    private void cleanupConnections() {
-        threadConns.entrySet().removeIf(e -> {
-            Thread t = e.getKey();
+    void recycle(H2Connection conn) {
+        boolean rmv = usedConns.remove(conn);
 
-            if (t.getState() == Thread.State.TERMINATED) {
-                e.getValue().keySet().forEach(c -> U.close(c, log));
-
-                return true;
-            }
+        assert rmv : "Connection isn't tracked [conn=" + conn + ']';
 
-            return false;
-        });
+        if (!connPool.recycle(conn))
+            conn.close();
     }
 
     /**
-     * Called periodically to clean up the statement cache.
+     * @return Data handler.
      */
-    private void cleanupStatements() {
-        long now = U.currentTimeMillis();
+    public DataHandler dataHandler() {
+        return dataNhd;
+    }
 
-        threadConns.values().forEach(set -> set.keySet().forEach(c ->{
-            if (now - c.statementCache().lastUsage() > stmtTimeout)
-                c.clearStatementCache();
-        }));
+    /**
+     * Sets internal H2 serializer.
+     *
+     * @param serializer Serializer.
+     */
+    void setH2Serializer(JavaObjectSerializer serializer) {
+        if (dataNhd != null && dataNhd instanceof Database)
+            DB_JOBJ_SERIALIZER.set((Database)dataNhd, serializer);
     }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
index 6653ddd..d6835ca 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2CachedStatementKey.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -35,36 +34,16 @@ class H2CachedStatementKey {
     private final byte flags;
 
     /**
-     * Constructor.
-     *
-     * @param schemaName Schema name.
-     * @param sql SQL.
-     */
-    public H2CachedStatementKey(String schemaName, String sql) {
-        this(schemaName, sql, null);
-    }
-
-    /**
      * Full-fledged constructor.
      *
      * @param schemaName Schema name.
      * @param sql SQL.
-     * @param fieldsQry Query with flags.
+     * @param flags Query flags.
      */
-    public H2CachedStatementKey(String schemaName, String sql, SqlFieldsQuery fieldsQry) {
+    public H2CachedStatementKey(String schemaName, String sql, byte flags) {
         this.schemaName = schemaName;
         this.sql = sql;
-
-        if (fieldsQry == null)
-            this.flags = 0; // flags only relevant for server side updates.
-        else {
-            this.flags = (byte)(1 +
-                (fieldsQry.isDistributedJoins() ? 2 : 0) +
-                (fieldsQry.isEnforceJoinOrder() ? 4 : 0) +
-                (fieldsQry.isCollocated() ? 8 : 0) +
-                (fieldsQry.isLocal() ? 8 : 0)
-            );
-        }
+        this.flags = flags;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Connection.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Connection.java
new file mode 100644
index 0000000..b78b304
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Connection.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.jdbc.JdbcStatement;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper to store connection with currently used schema and statement cache.
+ */
+public class H2Connection implements AutoCloseable {
+    /** */
+    private static final int STATEMENT_CACHE_SIZE = 256;
+
+    /** */
+    private final Connection conn;
+
+    /** */
+    private volatile String schema;
+
+    /** */
+    private volatile H2StatementCache statementCache;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /**
+     * @param conn Connection to use.
+     * @param log Logger.
+     */
+    H2Connection(Connection conn, IgniteLogger log) {
+        this.conn = conn;
+        this.log = log;
+
+        initStatementCache();
+    }
+
+    /**
+     * @return Schema name if schema is set, null otherwise.
+     */
+    String schema() {
+        return schema;
+    }
+
+    /**
+     * @param schema Schema name set on this connection.
+     */
+    void schema(@Nullable String schema) {
+        if (schema != null && !F.eq(this.schema, schema)) {
+            try {
+                if(schema.trim().isEmpty()){
+                    throw new IgniteSQLException("Failed to set schema for DB connection. " +
+                        "Schema name could not be an empty string");
+                }
+
+                this.schema = schema;
+
+                conn.setSchema(schema);
+            }
+            catch (SQLException e) {
+                throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
+                    schema + "]", e);
+            }
+        }
+    }
+
+    /**
+     * @return Connection.
+     */
+    Connection connection() {
+        return conn;
+    }
+
+    /**
+     * Clears statement cache.
+     */
+    void clearStatementCache() {
+        initStatementCache();
+    }
+
+    /**
+     * @return Statement cache.
+     */
+    H2StatementCache statementCache() {
+        return statementCache;
+    }
+
+    /**
+     * @return Statement cache size.
+     */
+    public int statementCacheSize() {
+        return statementCache == null ? 0 : statementCache.size();
+    }
+
+    /**
+     * Initializes statement cache.
+     */
+    private void initStatementCache() {
+        statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE);
+    }
+
+    /**
+     * Prepare statement caching it if needed.
+     *
+     * @param sql SQL.
+     * @return Prepared statement.
+     */
+    PreparedStatement prepareStatement(String sql, byte qryFlags) {
+        try {
+            PreparedStatement stmt = cachedPreparedStatement(sql, qryFlags);
+
+            if (stmt == null) {
+                H2CachedStatementKey key = new H2CachedStatementKey(schema, sql, qryFlags);
+
+                stmt = prepareStatementNoCache(sql);
+
+                statementCache.put(key, stmt);
+            }
+
+            return stmt;
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+        }
+    }
+
+    /**
+     * Get cached prepared statement (if any).
+     *
+     * @param sql SQL.
+     * @return Prepared statement or {@code null}.
+     * @throws SQLException On error.
+     */
+    private @Nullable PreparedStatement cachedPreparedStatement(String sql, byte qryFlags) throws SQLException {
+        H2CachedStatementKey key = new H2CachedStatementKey(schema, sql, qryFlags);
+
+        PreparedStatement stmt = statementCache.get(key);
+
+        // Nothing found.
+        if (stmt == null)
+            return null;
+
+        // Is statement still valid?
+        if (
+            stmt.isClosed() ||                                 // Closed.
+                stmt.unwrap(JdbcStatement.class).isCancelled() ||  // Cancelled.
+                GridSqlQueryParser.prepared(stmt).needRecompile() // Outdated (schema has been changed concurrently).
+        ) {
+            statementCache.remove(schema, sql, qryFlags);
+
+            return null;
+        }
+
+        return stmt;
+    }
+
+    /**
+     * Get prepared statement without caching.
+     *
+     * @param sql SQL.
+     * @return Prepared statement.
+     */
+    PreparedStatement prepareStatementNoCache(String sql) {
+        try {
+            return conn.prepareStatement(sql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(H2Connection.class, this);
+    }
+
+    /** Closes wrapped connection (return to pool or close). */
+    @Override public void close() {
+        U.close(conn, log);
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
deleted file mode 100644
index abf6cc1..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ConnectionWrapper.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Wrapper to store connection with currently used schema and statement cache.
- */
-public class H2ConnectionWrapper implements AutoCloseable {
-    /** */
-    private static final int STATEMENT_CACHE_SIZE = 256;
-
-    /** */
-    private final Connection conn;
-
-    /** */
-    private volatile String schema;
-
-    /** */
-    private volatile H2StatementCache statementCache;
-
-    /**
-     * @param conn Connection to use.
-     */
-    H2ConnectionWrapper(Connection conn) {
-        this.conn = conn;
-
-        initStatementCache();
-    }
-
-    /**
-     * @return Schema name if schema is set, null otherwise.
-     */
-    public String schema() {
-        return schema;
-    }
-
-    /**
-     * @param schema Schema name set on this connection.
-     */
-    public void schema(@Nullable String schema) {
-        this.schema = schema;
-    }
-
-    /**
-     * Connection for schema.
-     *
-     * @param schema Schema name.
-     * @return Connection.
-     */
-    public Connection connection(@Nullable String schema) {
-        if (schema != null && !F.eq(this.schema, schema)) {
-            try {
-                if(schema.trim().isEmpty()){
-                   throw new IgniteSQLException("Failed to set schema for DB connection. " +
-                           "Schema name could not be an empty string");
-                }
-                conn.setSchema(schema);
-                this.schema = schema;
-            }
-            catch (SQLException e) {
-                throw new IgniteSQLException("Failed to set schema for DB connection for thread [schema=" +
-                    schema + "]", e);
-            }
-        }
-
-        return conn;
-    }
-
-    /**
-     * @return Connection.
-     */
-    public Connection connection() {
-        return conn;
-    }
-
-    /**
-     * @return Statement cache corresponding to connection.
-     */
-    public H2StatementCache statementCache() {
-        return statementCache;
-    }
-
-    /**
-     * Clears statement cache.
-     */
-    public void clearStatementCache() {
-        initStatementCache();
-    }
-
-    /**
-     * @return Statement cache size.
-     */
-    public int statementCacheSize() {
-        return statementCache == null ? 0 : statementCache.size();
-    }
-
-    /**
-     * Initializes statement cache.
-     */
-    private void initStatementCache() {
-        statementCache = new H2StatementCache(STATEMENT_CACHE_SIZE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(H2ConnectionWrapper.class, this);
-    }
-
-    /** Closes wrapped connection. */
-    @Override public void close() {
-        U.closeQuiet(conn);
-    }
-}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
index 0e2a779..960418b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 
 /**
  * Special field set iterator based on database result set.
@@ -36,27 +35,27 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
     /** */
     private transient MvccQueryTracker mvccTracker;
 
-    /** Detached connection. */
-    private final ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn;
+    /** Connection. */
+    private final H2PooledConnection conn;
 
     /**
      * @param data Data.
      * @param mvccTracker Mvcc tracker.
-     * @param detachedConn Detached connection.
+     * @param pageSize Page size.
+     * @param conn Connection.
      * @throws IgniteCheckedException If failed.
      */
     public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker,
-        ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn,
+        H2PooledConnection conn,
         int pageSize,
-        IgniteLogger log, IgniteH2Indexing h2,
-        QueryContext qctx, boolean lazy)
+        IgniteLogger log, IgniteH2Indexing h2)
         throws IgniteCheckedException {
-        super(data, pageSize, log, h2, qctx, lazy);
+        super(data, pageSize, log, h2);
 
-        assert detachedConn != null;
+        assert conn != null;
 
         this.mvccTracker = mvccTracker;
-        this.detachedConn = detachedConn;
+        this.conn = conn;
     }
 
     /** {@inheritDoc} */
@@ -74,7 +73,7 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> {
             super.onClose();
         }
         finally {
-            detachedConn.recycle();
+            conn.close();
 
             if (mvccTracker != null)
                 mvccTracker.onDone();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2PooledConnection.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2PooledConnection.java
new file mode 100644
index 0000000..e10b54b
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2PooledConnection.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Pooled connection wrapper to use close semantic to recycle connection (return to the pool).
+ */
+public class H2PooledConnection implements AutoCloseable {
+    /** */
+    private volatile H2Connection delegate;
+
+    /** Connection manager. */
+    private final ConnectionManager connMgr;
+
+    /** Closed (recycled) flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /**
+     * @param conn Connection to use.
+     * @param connMgr Connection manager is use to recycle connection
+     *      (connection is closed or returned to connection pool).
+     */
+    H2PooledConnection(H2Connection conn, ConnectionManager connMgr) {
+        this.delegate = conn;
+        this.connMgr = connMgr;
+    }
+
+    /**
+     * @return Schema name if schema is set, null otherwise.
+     */
+    public String schema() {
+        return delegate.schema();
+    }
+
+    /**
+     * @param schema Schema name set on this connection.
+     */
+    public void schema(@Nullable String schema) {
+        delegate.schema(schema);
+    }
+
+    /**
+     * @return Connection.
+     */
+    public Connection connection() {
+        return delegate.connection();
+    }
+
+    /**
+     * @return Statement cache size.
+     */
+    public int statementCacheSize() {
+        return delegate.statementCacheSize();
+    }
+
+    /**
+     * Prepare statement caching it if needed.
+     *
+     * @param sql SQL.
+     * @return Prepared statement.
+     */
+    public PreparedStatement prepareStatement(String sql, byte qryFlags) throws IgniteCheckedException {
+        return delegate.prepareStatement(sql, qryFlags);
+    }
+
+    /**
+     * Get prepared statement without caching.
+     *
+     * @param sql SQL.
+     * @return Prepared statement.
+     */
+    public PreparedStatement prepareStatementNoCache(String sql) throws IgniteCheckedException {
+        boolean insertHack = GridH2Table.insertHackRequired(sql);
+
+        if (insertHack) {
+            GridH2Table.insertHack(true);
+
+            try {
+                return delegate.prepareStatementNoCache(sql);
+            }
+            finally {
+                GridH2Table.insertHack(false);
+            }
+        }
+        else
+            return delegate.prepareStatementNoCache(sql);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(H2PooledConnection.class, this);
+    }
+
+    /** Closes wrapped connection (return to pool or close). */
+    @Override public void close() {
+        assert delegate != null;
+
+        if (closed.compareAndSet(false, true)) {
+             H2Utils.resetSession(this);
+
+            connMgr.recycle(delegate);
+
+            delegate = null;
+        }
+    }
+}
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
index 5661b26..a00847e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
@@ -30,8 +30,6 @@ import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -99,15 +97,6 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
     /** Canceled. */
     private boolean canceled;
 
-    /** Query context. */
-    private final QueryContext qctx;
-
-    /** Query context registry. */
-    private final QueryContextRegistry qryCtxReg;
-
-    /** Query context registry. */
-    private final boolean lazy;
-
     /**
      * @param data Data array.
      * @param log Logger.
@@ -115,13 +104,10 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
      * @param pageSize Page size.
      * @throws IgniteCheckedException If failed.
      */
-    protected H2ResultSetIterator(ResultSet data, int pageSize, IgniteLogger log, IgniteH2Indexing h2,
-        QueryContext qctx, boolean lazy) throws IgniteCheckedException {
+    protected H2ResultSetIterator(ResultSet data, int pageSize, IgniteLogger log, IgniteH2Indexing h2)
+        throws IgniteCheckedException {
+        this.pageSize = pageSize;
         this.data = data;
-        this.qctx = qctx;
-        qryCtxReg = h2.queryContextRegistry();
-        this.lazy = lazy;
-        this.pageSize = !lazy ? 1 : pageSize;
 
         try {
             res = (ResultInterface)RESULT_FIELD.get(data);
@@ -158,15 +144,10 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
      * @throws IgniteCheckedException On cancel.
      */
     private boolean fetchPage() throws IgniteCheckedException {
-        if (lazy) {
-            lockTables();
-
-            qryCtxReg.setThreadLocal(qctx);
-        }
+        lockTables();
 
         try {
-            if (lazy)
-                GridH2Table.checkTablesVersions(ses);
+            GridH2Table.checkTablesVersions(ses);
 
             page.clear();
 
@@ -217,11 +198,7 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
             }
         }
         finally {
-            if (lazy) {
-                qryCtxReg.clearThreadLocal();
-
-                unlockTables();
-            }
+            unlockTables();
         }
     }
 
@@ -252,13 +229,13 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
 
     /** */
     public void lockTables() {
-        if (ses.isLazyQueryExecution() && !isClosed())
+        if (!isClosed() && ses.isLazyQueryExecution())
             GridH2Table.readLockTables(ses);
     }
 
     /** */
     public void unlockTables() {
-        if (ses != null && ses.isLazyQueryExecution())
+        if (ses.isLazyQueryExecution())
             GridH2Table.unlockTables(ses);
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2StatementCache.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2StatementCache.java
index edde67a..7fd0382 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2StatementCache.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2StatementCache.java
@@ -17,18 +17,17 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
 import java.sql.PreparedStatement;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Statement cache. LRU eviction policy is used. Not thread-safe.
  */
-final class H2StatementCache {
+public final class H2StatementCache {
     /** Last usage. */
     private volatile long lastUsage;
 
@@ -93,8 +92,8 @@ final class H2StatementCache {
      * @param schemaName Schema name.
      * @param sql SQL statement.
      */
-    void remove(String schemaName, String sql) {
-        lruStmtCache.remove(new H2CachedStatementKey(schemaName, sql));
+    void remove(String schemaName, String sql, byte qryFlags) {
+        lruStmtCache.remove(new H2CachedStatementKey(schemaName, sql, qryFlags));
     }
 
     /**
@@ -103,4 +102,16 @@ final class H2StatementCache {
     int size() {
         return lruStmtCache.size();
     }
+
+    /** */
+    public static byte queryFlags(QueryDescriptor qryDesc) {
+        assert qryDesc != null;
+
+        return queryFlags(qryDesc.distributedJoins(), qryDesc.enforceJoinOrder());
+    }
+
+    /** */
+    public static byte queryFlags(boolean distributedJoins, boolean enforceJoinOrder) {
+        return (byte)((distributedJoins ? 1 : 0) + (enforceJoinOrder ? 2 : 0));
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index 971dcab..7c00dbd 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Connection;
@@ -38,6 +39,7 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
@@ -78,6 +81,7 @@ import org.h2.result.SortOrder;
 import org.h2.table.Column;
 import org.h2.table.IndexColumn;
 import org.h2.util.LocalDateTimeUtils;
+import org.h2.value.CompareMode;
 import org.h2.value.DataType;
 import org.h2.value.Value;
 import org.h2.value.ValueArray;
@@ -109,6 +113,9 @@ import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N
  * H2 utility methods.
  */
 public class H2Utils {
+    /** Query context H2 variable name. */
+    static final String QCTX_VARIABLE_NAME = "_IGNITE_QUERY_CONTEXT";
+
     /**
      * The default precision for a char/varchar value.
      */
@@ -407,27 +414,41 @@ public class H2Utils {
      * @param c Connection.
      * @return Session.
      */
+    public static Session session(H2PooledConnection c) {
+        return session(c.connection());
+    }
+
+    /**
+     * @param c Connection.
+     * @return Session.
+     */
     public static Session session(Connection c) {
         return (Session)((JdbcConnection)c).getSession();
     }
 
     /**
      * @param conn Connection to use.
+     * @param qctx Query context.
      * @param distributedJoins If distributed joins are enabled.
      * @param enforceJoinOrder Enforce join order of tables.
      */
-    public static void setupConnection(Connection conn, boolean distributedJoins, boolean enforceJoinOrder) {
-        setupConnection(conn,distributedJoins, enforceJoinOrder, false);
+    public static void setupConnection(H2PooledConnection conn, QueryContext qctx,
+        boolean distributedJoins, boolean enforceJoinOrder) {
+        assert qctx != null;
+
+        setupConnection(conn, qctx, distributedJoins, enforceJoinOrder, false);
     }
 
     /**
      * @param conn Connection to use.
+     * @param qctx Query context.
      * @param distributedJoins If distributed joins are enabled.
      * @param enforceJoinOrder Enforce join order of tables.
      * @param lazy Lazy query execution mode.
      */
     public static void setupConnection(
-        Connection conn,
+        H2PooledConnection conn,
+        QueryContext qctx,
         boolean distributedJoins,
         boolean enforceJoinOrder,
         boolean lazy
@@ -437,6 +458,45 @@ public class H2Utils {
         s.setForceJoinOrder(enforceJoinOrder);
         s.setJoinBatchEnabled(distributedJoins);
         s.setLazyQueryExecution(lazy);
+
+        QueryContext oldCtx = (QueryContext)s.getVariable(QCTX_VARIABLE_NAME).getObject();
+
+        assert oldCtx == null || oldCtx == qctx : oldCtx;
+
+        s.setVariable(QCTX_VARIABLE_NAME, new ValueRuntimeSimpleObject<>(qctx));
+
+        // Hack with thread local context is used only for H2 methods that is called without Session object.
+        // e.g. GridH2Table.getRowCountApproximation (used only on optimization phase, after parse).
+        QueryContext.threadLocal(qctx);
+    }
+
+    /**
+     * Clean up session for further reuse.
+     *
+     * @param conn Connection to use.
+     */
+    public static void resetSession(H2PooledConnection conn) {
+        Session s = session(conn);
+
+        s.setVariable(QCTX_VARIABLE_NAME, ValueNull.INSTANCE);
+    }
+
+    /**
+     * @param conn Connection to use.
+     * @return Query context.
+     */
+    public static QueryContext context(H2PooledConnection conn) {
+        Session s = session(conn);
+
+        return context(s);
+    }
+
+    /**
+     * @param ses Session.
+     * @return Query context.
+     */
+    public static QueryContext context(Session ses) {
+        return (QueryContext)ses.getVariable(QCTX_VARIABLE_NAME).getObject();
     }
 
     /**
@@ -984,4 +1044,167 @@ public class H2Utils {
         return keyCols.toArray(new IndexColumn[0]);
     }
 
+    /**
+     * @param <T>
+     */
+    public static class ValueRuntimeSimpleObject<T> extends Value {
+        /** */
+        private final T val;
+
+        /**
+         * @param val Object.
+         */
+        public ValueRuntimeSimpleObject(T val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getSQL() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getType() {
+            return Value.JAVA_OBJECT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getPrecision() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getDisplaySize() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String getString() {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object getObject() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void set(PreparedStatement prep, int parameterIndex) throws SQLException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected int compareSecure(Value v, CompareMode mode) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            ValueRuntimeSimpleObject<?> object = (ValueRuntimeSimpleObject<?>)o;
+            return Objects.equals(val, object.val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return Objects.hash(val);
+        }
+    }
+
+    /**
+     * @param cls Class.
+     * @param fldName Fld name.
+     */
+    public static <T, R> Getter<T, R> getter(Class<? extends T> cls, String fldName) {
+        Field field;
+
+        try {
+            field = cls.getDeclaredField(fldName);
+        }
+        catch (NoSuchFieldException e) {
+            throw new RuntimeException(e);
+        }
+
+        field.setAccessible(true);
+
+        return new Getter<>(field);
+    }
+
+    /**
+     * Field getter.
+     */
+    public static class Getter<T, R> {
+        /** */
+        private final Field fld;
+
+        /**
+         * @param fld Fld.
+         */
+        private Getter(Field fld) {
+            this.fld = fld;
+        }
+
+        /**
+         * @param obj Object.
+         * @return Result.
+         */
+        public R get(T obj) {
+            try {
+                return (R)fld.get(obj);
+            }
+            catch (IllegalAccessException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /**
+     * @param cls Class.
+     * @param fldName Fld name.
+     */
+    public static <T, R> Setter<T, R> setter(Class<? extends T> cls, String fldName) {
+        Field field;
+
+        try {
+            field = cls.getDeclaredField(fldName);
+        }
+        catch (NoSuchFieldException e) {
+            throw new RuntimeException(e);
+        }
+
+        field.setAccessible(true);
+
+        return new Setter<>(field);
+    }
+
+    /**
+     * Field getter.
+     */
+    public static class Setter<T, R> {
+        /** */
+        private final Field fld;
+
+        /**
+         * @param fld Fld.
+         */
+        private Setter(Field fld) {
+            this.fld = fld;
+        }
+
+        /**
+         * @param obj Object.
+         * @param val Value.
+         */
+        public void set(T obj, R val) {
+            try {
+                fld.set(obj, val);
+            }
+            catch (IllegalAccessException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ed13f90..9c09c92 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.sql.BatchUpdateException;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -547,44 +546,35 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             return new GridQueryFieldsResultAdapter(select.meta(), null) {
                 @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
-                    assert qryCtxRegistry.getThreadLocal() == null;
-
-                    qryCtxRegistry.setThreadLocal(qctx);
-
-                    ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn = connMgr.detachThreadConnection();
+                    H2PooledConnection conn = connections().connection(qryDesc.schemaName());
 
                     try {
-                        Connection conn0 = conn.object().connection(qryDesc.schemaName());
-
-                        H2Utils.setupConnection(conn0,
+                        H2Utils.setupConnection(conn, qctx,
                             qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy());
 
                         List<Object> args = F.asList(qryParams.arguments());
 
-                        PreparedStatement stmt = preparedStatementWithParams(
-                            conn0,
-                            qry,
-                            args,
-                            true
-                        );
+                        PreparedStatement stmt = conn.prepareStatement(qry, H2StatementCache.queryFlags(qryDesc));
+
+                        H2Utils.bindParameters(stmt, args);
+
+                        H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry);
 
                         ResultSet rs = executeSqlQueryWithTimer(
                             stmt,
-                            conn0,
+                            conn,
                             qry,
-                            args,
                             timeout,
                             cancel,
                             qryParams.dataPageScanEnabled(),
-                            new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry)
+                            qryInfo
                         );
 
                         return new H2FieldsIterator(rs, mvccTracker, conn, qryParams.pageSize(),
-                            log, IgniteH2Indexing.this,
-                            qctx, qryParams.lazy());
+                            log, IgniteH2Indexing.this);
                     }
                     catch (IgniteCheckedException | RuntimeException | Error e) {
-                        conn.recycle();
+                        conn.close();
 
                         try {
                             if (mvccTracker != null)
@@ -596,9 +586,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                         throw e;
                     }
-                    finally {
-                        qryCtxRegistry.clearThreadLocal();
-                    }
                 }
             };
         }
@@ -765,32 +752,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Prepares sql statement.
-     *
-     * @param conn Connection.
-     * @param sql Sql.
-     * @param params Params.
-     * @param useStmtCache If {@code true} use stmt cache.
-     * @return Prepared statement with set parameters.
-     * @throws IgniteCheckedException If failed.
-     */
-    public PreparedStatement preparedStatementWithParams(Connection conn, String sql, Collection<Object> params,
-        boolean useStmtCache) throws IgniteCheckedException {
-        final PreparedStatement stmt;
-
-        try {
-            stmt = useStmtCache ? connMgr.prepareStatement(conn, sql) : connMgr.prepareStatementNoCache(conn, sql);
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
-        }
-
-        H2Utils.bindParameters(stmt, params);
-
-        return stmt;
-    }
-
-    /**
      * Executes sql query statement.
      *
      * @param conn Connection,.
@@ -800,8 +761,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
-    private ResultSet executeSqlQuery(final Connection conn, final PreparedStatement stmt,
-        int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException {
+    private ResultSet executeSqlQuery(final H2PooledConnection conn, final PreparedStatement stmt,
+        int timeoutMillis, @Nullable GridQueryCancel cancel) throws IgniteCheckedException  {
         if (cancel != null)
             cancel.add(() -> cancelStatement(stmt));
 
@@ -820,7 +781,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
                 throw new QueryCancelledException();
 
-            throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e);
+            if (e.getCause() instanceof IgniteSQLException)
+                throw (IgniteSQLException)e.getCause();
+
+            throw new IgniteSQLException(e);
         }
     }
 
@@ -851,7 +815,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     public ResultSet executeSqlQueryWithTimer(
-        Connection conn,
+        H2PooledConnection conn,
         String sql,
         @Nullable Collection<Object> params,
         int timeoutMillis,
@@ -859,8 +823,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         Boolean dataPageScanEnabled,
         final H2QueryInfo qryInfo
     ) throws IgniteCheckedException {
-        return executeSqlQueryWithTimer(preparedStatementWithParams(conn, sql, params, false),
-            conn, sql, params, timeoutMillis, cancel, dataPageScanEnabled, qryInfo);
+        PreparedStatement stmt = conn.prepareStatementNoCache(sql);
+
+        H2Utils.bindParameters(stmt, params);
+
+        return executeSqlQueryWithTimer(stmt,
+            conn, sql, timeoutMillis, cancel, dataPageScanEnabled, qryInfo);
     }
 
     /**
@@ -878,7 +846,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param stmt Prepared statement for query.
      * @param conn Connection.
      * @param sql Sql query.
-     * @param params Parameters.
      * @param timeoutMillis Query timeout.
      * @param cancel Query cancel.
      * @param dataPageScanEnabled If data page scan is enabled.
@@ -887,9 +854,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      */
     public ResultSet executeSqlQueryWithTimer(
         PreparedStatement stmt,
-        Connection conn,
+        H2PooledConnection conn,
         String sql,
-        @Nullable Collection<Object> params,
         int timeoutMillis,
         @Nullable GridQueryCancel cancel,
         Boolean dataPageScanEnabled,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 70495a2..a4b1b10 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -46,7 +45,6 @@ import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert;
@@ -333,219 +331,234 @@ public class QueryParser {
     @SuppressWarnings("IfMayBeConditional")
     private QueryParserResult parseH2(String schemaName, SqlFieldsQuery qry, boolean batched,
         boolean remainingAllowed) {
-        Connection c = connMgr.connectionForThread().connection(schemaName);
+        try (H2PooledConnection c = connMgr.connection(schemaName)) {
+            // For queries that are explicitly local, we rely on the flag specified in the query
+            // because this parsing result will be cached and used for queries directly.
+            // For other queries, we enforce join order at this stage to avoid premature optimizations
+            // (and therefore longer parsing) as long as there'll be more parsing at split stage.
+            boolean enforceJoinOrderOnParsing = (!qry.isLocal() || qry.isEnforceJoinOrder());
 
-        // For queries that are explicitly local, we rely on the flag specified in the query
-        // because this parsing result will be cached and used for queries directly.
-        // For other queries, we enforce join order at this stage to avoid premature optimizations
-        // (and therefore longer parsing) as long as there'll be more parsing at split stage.
-        boolean enforceJoinOrderOnParsing = (!qry.isLocal() || qry.isEnforceJoinOrder());
+            QueryContext qctx = QueryContext.parseContext(idx.backupFilter(null, null), qry.isLocal());
 
-        H2Utils.setupConnection(c, /*distributedJoins*/false, /*enforceJoinOrder*/enforceJoinOrderOnParsing);
+            H2Utils.setupConnection(
+                c,
+                qctx,
+                false,
+                enforceJoinOrderOnParsing,
+                false);
 
-        QueryContext qctx = new QueryContext(
-            0,
-            idx.backupFilter(null, null),
-            null,
-            null,
-            null,
-            qry.isLocal()
-        );
-
-        QueryContextRegistry qryCtxRegistry = idx.queryContextRegistry();
-
-        qryCtxRegistry.setThreadLocal(qctx);
-
-        PreparedStatement stmt = null;
-
-        try {
-            stmt = connMgr.prepareStatementNoCache(c, qry.getSql());
-
-            if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
-                throw new IgniteSQLException("Multiple statements queries are not supported for local queries.",
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-            GridSqlQueryParser.PreparedWithRemaining prep = GridSqlQueryParser.preparedWithRemaining(stmt);
-
-            Prepared prepared = prep.prepared();
-
-            if (GridSqlQueryParser.isExplainUpdate(prepared))
-                throw new IgniteSQLException("Explains of update queries are not supported.",
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            PreparedStatement stmt = null;
 
-            // Get remaining query and check if it is allowed.
-            SqlFieldsQuery remainingQry = null;
-
-            if (!F.isEmpty(prep.remainingSql())) {
-                checkRemainingAllowed(remainingAllowed);
+            try {
+                stmt = c.prepareStatementNoCache(qry.getSql());
 
-                remainingQry = cloneFieldsQuery(qry).setSql(prep.remainingSql());
-            }
+                if (qry.isLocal() && GridSqlQueryParser.checkMultipleStatements(stmt))
+                    throw new IgniteSQLException("Multiple statements queries are not supported for local queries.",
+                        IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-            // Prepare new query.
-            SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(prepared.getSQL());
+                GridSqlQueryParser.PreparedWithRemaining prep = GridSqlQueryParser.preparedWithRemaining(stmt);
 
-            final int paramsCnt = prepared.getParameters().size();
+                Prepared prepared = prep.prepared();
 
-            Object[] argsOrig = qry.getArgs();
+                if (GridSqlQueryParser.isExplainUpdate(prepared))
+                    throw new IgniteSQLException("Explains of update queries are not supported.",
+                        IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-            Object[] args = null;
-            Object[] remainingArgs = null;
+                // Get remaining query and check if it is allowed.
+                SqlFieldsQuery remainingQry = null;
 
-            if (!batched && paramsCnt > 0) {
-                if (argsOrig == null || argsOrig.length < paramsCnt)
-                    // Not enough parameters, but we will handle this later on execution phase.
-                    args = argsOrig;
-                else {
-                    args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
+                if (!F.isEmpty(prep.remainingSql())) {
+                    checkRemainingAllowed(remainingAllowed);
 
-                    if (paramsCnt != argsOrig.length)
-                        remainingArgs = Arrays.copyOfRange(argsOrig, paramsCnt, argsOrig.length);
+                    remainingQry = cloneFieldsQuery(qry).setSql(prep.remainingSql());
                 }
-            }
-            else
-                remainingArgs = argsOrig;
-
-            newQry.setArgs(args);
-
-            QueryDescriptor newQryDesc = queryDescriptor(schemaName, newQry);
-
-            if (remainingQry != null)
-                remainingQry.setArgs(remainingArgs);
-
-            final List<JdbcParameterMeta> paramsMeta;
-
-            try {
-                paramsMeta = H2Utils.parametersMeta(stmt.getParameterMetaData());
 
-                assert prepared.getParameters().size() == paramsMeta.size();
-            }
-            catch (IgniteCheckedException | SQLException e) {
-                throw new IgniteSQLException("Failed to get parameters metadata", IgniteQueryErrorCode.UNKNOWN, e);
-            }
+                // Prepare new query.
+                SqlFieldsQuery newQry = cloneFieldsQuery(qry).setSql(prepared.getSQL());
 
-            // Do actual parsing.
-            if (CommandProcessor.isCommand(prepared)) {
-                GridSqlStatement cmdH2 = new GridSqlQueryParser(false, log).parse(prepared);
+                final int paramsCnt = prepared.getParameters().size();
 
-                QueryParserResultCommand cmd = new QueryParserResultCommand(null, cmdH2, false);
+                Object[] argsOrig = qry.getArgs();
 
-                return new QueryParserResult(
-                    newQryDesc,
-                    queryParameters(newQry),
-                    remainingQry,
-                    paramsMeta,
-                    null,
-                    null,
-                    cmd
-                );
-            }
-            else if (CommandProcessor.isCommandNoOp(prepared)) {
-                QueryParserResultCommand cmd = new QueryParserResultCommand(null, null, true);
+                Object[] args = null;
+                Object[] remainingArgs = null;
 
-                return new QueryParserResult(
-                    newQryDesc,
-                    queryParameters(newQry),
-                    remainingQry,
-                    paramsMeta,
-                    null,
-                    null,
-                    cmd
-                );
-            }
-            else if (GridSqlQueryParser.isDml(prepared)) {
-                QueryParserResultDml dml = prepareDmlStatement(newQryDesc, prepared);
-
-                return new QueryParserResult(
-                    newQryDesc,
-                    queryParameters(newQry),
-                    remainingQry,
-                    paramsMeta,
-                    null,
-                    dml,
-                    null
-                );
-            }
-            else if (!prepared.isQuery()) {
-                throw new IgniteSQLException("Unsupported statement: " + newQry.getSql(),
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-            }
+                if (!batched && paramsCnt > 0) {
+                    if (argsOrig == null || argsOrig.length < paramsCnt)
+                        // Not enough parameters, but we will handle this later on execution phase.
+                        args = argsOrig;
+                    else {
+                        args = Arrays.copyOfRange(argsOrig, 0, paramsCnt);
 
-            // Parse SELECT.
-            GridSqlQueryParser parser = new GridSqlQueryParser(false, log);
+                        if (paramsCnt != argsOrig.length)
+                            remainingArgs = Arrays.copyOfRange(argsOrig, paramsCnt, argsOrig.length);
+                    }
+                }
+                else
+                    remainingArgs = argsOrig;
 
-            GridSqlQuery selectStmt = (GridSqlQuery)parser.parse(prepared);
+                newQry.setArgs(args);
 
-            List<Integer> cacheIds = parser.cacheIds();
-            Integer mvccCacheId = mvccCacheIdForSelect(parser.objectsMap());
+                QueryDescriptor newQryDesc = queryDescriptor(schemaName, newQry);
 
-            // Calculate if query is in fact can be executed locally.
-            boolean loc = qry.isLocal();
+                if (remainingQry != null)
+                    remainingQry.setArgs(remainingArgs);
 
-            if (!loc) {
-                if (parser.isLocalQuery())
-                    loc = true;
-            }
+                final List<JdbcParameterMeta> paramsMeta;
 
-            // If this is a local query, check if it must be split.
-            boolean locSplit = false;
+                try {
+                    paramsMeta = H2Utils.parametersMeta(stmt.getParameterMetaData());
 
-            if (loc) {
-                GridCacheContext cctx = parser.getFirstPartitionedCache();
+                    assert prepared.getParameters().size() == paramsMeta.size();
+                }
+                catch (IgniteCheckedException | SQLException e) {
+                    throw new IgniteSQLException("Failed to get parameters metadata", IgniteQueryErrorCode.UNKNOWN, e);
+                }
 
-                if (cctx != null && cctx.config().getQueryParallelism() > 1)
-                    locSplit = true;
-            }
+                // Do actual parsing.
+                if (CommandProcessor.isCommand(prepared)) {
+                    GridSqlStatement cmdH2 = new GridSqlQueryParser(false, log).parse(prepared);
 
-            // Split is required either if query is distributed, or when it is local, but executed
-            // over segmented PARTITIONED case. In this case multiple map queries will be executed against local
-            // node stripes in parallel and then merged through reduce process.
-            boolean splitNeeded = !loc || locSplit;
+                    QueryParserResultCommand cmd = new QueryParserResultCommand(null, cmdH2, false);
 
-            String forUpdateQryOutTx = null;
-            String forUpdateQryTx = null;
-            GridCacheTwoStepQuery forUpdateTwoStepQry = null;
+                    return new QueryParserResult(
+                        newQryDesc,
+                        queryParameters(newQry),
+                        remainingQry,
+                        paramsMeta,
+                        null,
+                        null,
+                        cmd
+                    );
+                }
+                else if (CommandProcessor.isCommandNoOp(prepared)) {
+                    QueryParserResultCommand cmd = new QueryParserResultCommand(null, null, true);
+
+                    return new QueryParserResult(
+                        newQryDesc,
+                        queryParameters(newQry),
+                        remainingQry,
+                        paramsMeta,
+                        null,
+                        null,
+                        cmd
+                    );
+                }
+                else if (GridSqlQueryParser.isDml(prepared)) {
+                    QueryParserResultDml dml = prepareDmlStatement(newQryDesc, prepared);
+
+                    return new QueryParserResult(
+                        newQryDesc,
+                        queryParameters(newQry),
+                        remainingQry,
+                        paramsMeta,
+                        null,
+                        dml,
+                        null
+                    );
+                }
+                else if (!prepared.isQuery()) {
+                    throw new IgniteSQLException("Unsupported statement: " + newQry.getSql(),
+                        IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                }
 
-            boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(prepared);
+                // Parse SELECT.
+                GridSqlQueryParser parser = new GridSqlQueryParser(false, log);
 
-            // SELECT FOR UPDATE case handling. We need to create extra queries with appended _key
-            // column to be able to lock selected rows further.
-            if (forUpdate) {
-                // We have checked above that it's not an UNION query, so it's got to be SELECT.
-                assert selectStmt instanceof GridSqlSelect;
+                GridSqlQuery selectStmt = (GridSqlQuery)parser.parse(prepared);
 
-                // Check FOR UPDATE invariants: only one table, MVCC is there.
-                if (cacheIds.size() != 1)
-                    throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
-                        "that involve single transactional cache.");
+                List<Integer> cacheIds = parser.cacheIds();
+                Integer mvccCacheId = mvccCacheIdForSelect(parser.objectsMap());
 
-                if (mvccCacheId == null)
-                    throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
-                        "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                // Calculate if query is in fact can be executed locally.
+                boolean loc = qry.isLocal();
 
-                // We need a copy because we are going to modify AST a bit. We do not want to modify original select.
-                GridSqlSelect selForUpdate = ((GridSqlSelect)selectStmt).copySelectForUpdate();
+                if (!loc) {
+                    if (parser.isLocalQuery())
+                        loc = true;
+                }
 
-                // Clear forUpdate flag to run it as a plain query.
-                selForUpdate.forUpdate(false);
-                ((GridSqlSelect)selectStmt).forUpdate(false);
+                // If this is a local query, check if it must be split.
+                boolean locSplit = false;
 
-                // Remember sql string without FOR UPDATE clause.
-                forUpdateQryOutTx = selForUpdate.getSQL();
+                if (loc) {
+                    GridCacheContext cctx = parser.getFirstPartitionedCache();
 
-                GridSqlAlias keyCol = keyColumn(selForUpdate);
+                    if (cctx != null && cctx.config().getQueryParallelism() > 1)
+                        locSplit = true;
+                }
 
-                selForUpdate.addColumn(keyCol, true);
+                // Split is required either if query is distributed, or when it is local, but executed
+                // over segmented PARTITIONED case. In this case multiple map queries will be executed against local
+                // node stripes in parallel and then merged through reduce process.
+                boolean splitNeeded = !loc || locSplit;
+
+                String forUpdateQryOutTx = null;
+                String forUpdateQryTx = null;
+                GridCacheTwoStepQuery forUpdateTwoStepQry = null;
+
+                boolean forUpdate = GridSqlQueryParser.isForUpdateQuery(prepared);
+
+                // SELECT FOR UPDATE case handling. We need to create extra queries with appended _key
+                // column to be able to lock selected rows further.
+                if (forUpdate) {
+                    // We have checked above that it's not an UNION query, so it's got to be SELECT.
+                    assert selectStmt instanceof GridSqlSelect;
+
+                    // Check FOR UPDATE invariants: only one table, MVCC is there.
+                    if (cacheIds.size() != 1)
+                        throw new IgniteSQLException("SELECT FOR UPDATE is supported only for queries " +
+                            "that involve single transactional cache.");
+
+                    if (mvccCacheId == null)
+                        throw new IgniteSQLException("SELECT FOR UPDATE query requires transactional cache " +
+                            "with MVCC enabled.", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+                    // We need a copy because we are going to modify AST a bit. We do not want to modify original select.
+                    GridSqlSelect selForUpdate = ((GridSqlSelect)selectStmt).copySelectForUpdate();
+
+                    // Clear forUpdate flag to run it as a plain query.
+                    selForUpdate.forUpdate(false);
+                    ((GridSqlSelect)selectStmt).forUpdate(false);
+
+                    // Remember sql string without FOR UPDATE clause.
+                    forUpdateQryOutTx = selForUpdate.getSQL();
+
+                    GridSqlAlias keyCol = keyColumn(selForUpdate);
+
+                    selForUpdate.addColumn(keyCol, true);
+
+                    // Remember sql string without FOR UPDATE clause and with _key column.
+                    forUpdateQryTx = selForUpdate.getSQL();
+
+                    // Prepare additional two-step query for FOR UPDATE case.
+                    if (splitNeeded) {
+                        c.schema(newQry.getSchema());
+
+                        forUpdateTwoStepQry = GridSqlQuerySplitter.split(
+                            c,
+                            selForUpdate,
+                            forUpdateQryTx,
+                            newQry.isCollocated(),
+                            newQry.isDistributedJoins(),
+                            newQry.isEnforceJoinOrder(),
+                            locSplit,
+                            idx,
+                            paramsCnt,
+                            log
+                        );
+                    }
+                }
 
-                // Remember sql string without FOR UPDATE clause and with _key column.
-                forUpdateQryTx = selForUpdate.getSQL();
+                GridCacheTwoStepQuery twoStepQry = null;
 
-                // Prepare additional two-step query for FOR UPDATE case.
                 if (splitNeeded) {
-                    forUpdateTwoStepQry = GridSqlQuerySplitter.split(
-                        connMgr.connectionForThread().connection(newQry.getSchema()),
-                        selForUpdate,
-                        forUpdateQryTx,
+                    c.schema(newQry.getSchema());
+
+                    twoStepQry = GridSqlQuerySplitter.split(
+                        c,
+                        selectStmt,
+                        newQry.getSql(),
                         newQry.isCollocated(),
                         newQry.isDistributedJoins(),
                         newQry.isEnforceJoinOrder(),
@@ -555,55 +568,36 @@ public class QueryParser {
                         log
                     );
                 }
-            }
 
-            GridCacheTwoStepQuery twoStepQry = null;
+                List<GridQueryFieldMetadata> meta = H2Utils.meta(stmt.getMetaData());
 
-            if (splitNeeded) {
-                twoStepQry = GridSqlQuerySplitter.split(
-                    connMgr.connectionForThread().connection(newQry.getSchema()),
+                QueryParserResultSelect select = new QueryParserResultSelect(
                     selectStmt,
-                    newQry.getSql(),
-                    newQry.isCollocated(),
-                    newQry.isDistributedJoins(),
-                    newQry.isEnforceJoinOrder(),
-                    locSplit,
-                    idx,
-                    paramsCnt,
-                    log
+                    twoStepQry,
+                    forUpdateTwoStepQry,
+                    meta,
+                    cacheIds,
+                    mvccCacheId,
+                    forUpdateQryOutTx,
+                    forUpdateQryTx
                 );
-            }
-
-            List<GridQueryFieldMetadata> meta = H2Utils.meta(stmt.getMetaData());
-
-            QueryParserResultSelect select = new QueryParserResultSelect(
-                selectStmt,
-                twoStepQry,
-                forUpdateTwoStepQry,
-                meta,
-                cacheIds,
-                mvccCacheId,
-                forUpdateQryOutTx,
-                forUpdateQryTx
-            );
-
-            return new QueryParserResult(
-                newQryDesc,
-                queryParameters(newQry),
-                remainingQry,
-                paramsMeta,
-                select,
-                null,
-                null
-            );
-        }
-        catch (IgniteCheckedException | SQLException e) {
-            throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
-        }
-        finally {
-            qryCtxRegistry.clearThreadLocal();
 
-            U.close(stmt, log);
+                return new QueryParserResult(
+                    newQryDesc,
+                    queryParameters(newQry),
+                    remainingQry,
+                    paramsMeta,
+                    select,
+                    null,
+                    null
+                );
+            }
+            catch (IgniteCheckedException | SQLException e) {
+                throw new IgniteSQLException("Failed to parse query. " + e.getMessage(), IgniteQueryErrorCode.PARSING, e);
+            }
+            finally {
+                U.close(stmt, log);
+            }
         }
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
index 49c766f..826fb2d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.h2;
 
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -232,10 +231,10 @@ public class SchemaManager {
                 createSchema(schema, true);
             }
 
-            try (Connection c = connMgr.connectionNoCache(schema)) {
-                SqlSystemTableEngine.registerView(c, view);
+            try (H2PooledConnection c = connMgr.connection(schema)) {
+                SqlSystemTableEngine.registerView(c.connection(), view);
 
-                    systemViews.add(view);
+                systemViews.add(view);
             }
         }
         catch (IgniteCheckedException | SQLException e) {
@@ -327,10 +326,7 @@ public class SchemaManager {
 
         H2Schema schema = schema(schemaName);
 
-        Connection conn = null;
-        try {
-            conn = connMgr.connectionForThread().connection(schema.schemaName());
-
+        try(H2PooledConnection conn = connMgr.connection(schema.schemaName())) {
             GridH2Table h2tbl = createTable(schema.schemaName(), schema, tblDesc, conn);
 
             schema.add(tblDesc);
@@ -339,8 +335,6 @@ public class SchemaManager {
                 throw new IllegalStateException("Table already exists: " + h2tbl.identifierString());
         }
         catch (SQLException e) {
-            connMgr.onSqlException(conn);
-
             throw new IgniteCheckedException("Failed to register query type: " + tblDesc, e);
         }
     }
@@ -534,7 +528,7 @@ public class SchemaManager {
      * @throws SQLException If failed to create db table.
      * @throws IgniteCheckedException If failed.
      */
-    private GridH2Table createTable(String schemaName, H2Schema schema, H2TableDescriptor tbl, Connection conn)
+    private GridH2Table createTable(String schemaName, H2Schema schema, H2TableDescriptor tbl, H2PooledConnection conn)
         throws SQLException, IgniteCheckedException {
         assert schema != null;
         assert tbl != null;
@@ -546,7 +540,7 @@ public class SchemaManager {
 
         GridH2RowDescriptor rowDesc = new GridH2RowDescriptor(tbl, tbl.type());
 
-        GridH2Table h2Tbl = H2TableEngine.createTable(conn, sql, rowDesc, tbl);
+        GridH2Table h2Tbl = H2TableEngine.createTable(conn.connection(), sql, rowDesc, tbl);
 
         for (GridH2IndexBase usrIdx : tbl.createUserIndexes())
             createInitialUserIndex(schemaName, tbl, usrIdx);
@@ -565,28 +559,26 @@ public class SchemaManager {
         if (log.isDebugEnabled())
             log.debug("Removing query index table: " + tbl.fullTableName());
 
-        Connection c = connMgr.connectionForThread().connection(tbl.schemaName());
-
-        Statement stmt = null;
-
-        try {
-            stmt = c.createStatement();
+        try (H2PooledConnection c = connMgr.connection(tbl.schemaName())) {
+            Statement stmt = null;
 
-            String sql = "DROP TABLE IF EXISTS " + tbl.fullTableName();
+            try {
+                stmt = c.connection().createStatement();
 
-            if (log.isDebugEnabled())
-                log.debug("Dropping database index table with SQL: " + sql);
+                String sql = "DROP TABLE IF EXISTS " + tbl.fullTableName();
 
-            stmt.executeUpdate(sql);
-        }
-        catch (SQLException e) {
-            connMgr.onSqlException(c);
+                if (log.isDebugEnabled())
+                    log.debug("Dropping database index table with SQL: " + sql);
 
-            throw new IgniteSQLException("Failed to drop database index table [type=" + tbl.type().name() +
-                ", table=" + tbl.fullTableName() + "]", IgniteQueryErrorCode.TABLE_DROP_FAILED, e);
-        }
-        finally {
-            U.close(stmt, log);
+                stmt.executeUpdate(sql);
+            }
+            catch (SQLException e) {
+                throw new IgniteSQLException("Failed to drop database index table [type=" + tbl.type().name() +
+                    ", table=" + tbl.fullTableName() + "]", IgniteQueryErrorCode.TABLE_DROP_FAILED, e);
+            }
+            finally {
+                U.close(stmt, log);
+            }
         }
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index b49dcea..98e3c47 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -98,7 +99,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
         IndexingQueryCacheFilter filter = null;
         MvccSnapshot mvccSnapshot = null;
 
-        QueryContext qctx = queryContextRegistry().getThreadLocal();
+        QueryContext qctx = H2Utils.context(ses);
 
         int seg = 0;
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 600394d..6697ea7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -406,13 +406,15 @@ public class H2TreeIndex extends H2TreeIndexBase {
         assert upper == null || upper instanceof H2Row : upper;
 
         try {
-            int seg = threadLocalSegment();
+            QueryContext qctx = ses != null ? H2Utils.context(ses) : null;
+
+            int seg = segment(qctx);
 
             H2Tree tree = treeForRead(seg);
 
             // If it is known that only one row will be returned an optimization is employed
             if (isSingleRowLookup(lower, upper, tree)) {
-                H2Row row = tree.findOne((H2Row)lower, filter(qryCtxRegistry.getThreadLocal()), null);
+                H2Row row = tree.findOne((H2Row)lower, filter(qctx), null);
 
                 if (row == null || isExpired(row))
                     return GridH2Cursor.EMPTY;
@@ -421,7 +423,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
             }
             else {
                 return new H2Cursor(tree.find((H2Row)lower,
-                    (H2Row)upper, filter(qryCtxRegistry.getThreadLocal()), null));
+                    (H2Row)upper, filter(qctx), null));
             }
         }
         catch (IgniteCheckedException e) {
@@ -449,12 +451,12 @@ public class H2TreeIndex extends H2TreeIndexBase {
     /** {@inheritDoc} */
     @Override public H2CacheRow put(H2CacheRow row) {
         try {
-            InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
-
             int seg = segmentForRow(cctx, row);
 
             H2Tree tree = treeForRead(seg);
 
+            InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
             assert cctx.shared().database().checkpointLockIsHeldByThread();
 
             return (H2CacheRow)tree.put(row);
@@ -497,12 +499,12 @@ public class H2TreeIndex extends H2TreeIndexBase {
         assert row instanceof H2Row : row;
 
         try {
-            InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
-
             int seg = segmentForRow(cctx, row);
 
             H2Tree tree = treeForRead(seg);
 
+            InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
             assert cctx.shared().database().checkpointLockIsHeldByThread();
 
             return tree.removex((H2Row)row);
@@ -520,11 +522,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
     /** {@inheritDoc} */
     @Override public long getRowCount(Session ses) {
         try {
-            int seg = threadLocalSegment();
+            QueryContext qctx = H2Utils.context(ses);
 
-            H2Tree tree = treeForRead(seg);
+            int seg = segment(qctx);
 
-            QueryContext qctx = qryCtxRegistry.getThreadLocal();
+            H2Tree tree = treeForRead(seg);
 
             return tree.size(filter(qctx));
         }
@@ -534,10 +536,11 @@ public class H2TreeIndex extends H2TreeIndexBase {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor findFirstOrLast(Session session, boolean b) {
+    @Override public Cursor findFirstOrLast(Session ses, boolean b) {
         try {
-            H2Tree tree = treeForRead(threadLocalSegment());
-            QueryContext qctx = qryCtxRegistry.getThreadLocal();
+            QueryContext qctx = H2Utils.context(ses);
+
+            H2Tree tree = treeForRead(segment(qctx));
 
             H2Row found = b ? tree.findFirst(filter(qctx)) : tree.findLast(filter(qctx));
 
@@ -715,7 +718,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
 
     /** {@inheritDoc} */
     @Override public IndexLookupBatch createLookupBatch(TableFilter[] filters, int filter) {
-        QueryContext qctx = qryCtxRegistry.getThreadLocal();
+        QueryContext qctx = H2Utils.context(filters[filter].getSession());
 
         if (qctx == null || qctx.distributedJoinContext() == null || !getTable().isPartitioned())
             return null;
@@ -736,9 +739,7 @@ public class H2TreeIndex extends H2TreeIndexBase {
             }
         }
 
-        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
-
-        return new DistributedLookupBatch(this, cctx, queryContextRegistry(), ucast, affColId);
+        return new DistributedLookupBatch(this, cctx, ucast, affColId);
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index e245c07..66c4bb2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -35,8 +35,6 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
 import org.apache.ignite.internal.processors.query.h2.ConnectionManager;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
@@ -491,13 +489,13 @@ public final class UpdatePlan {
     public UpdateSourceIterator<?> iteratorForTransaction(ConnectionManager connMgr, QueryCursor<List<?>> cur) {
         switch (mode) {
             case MERGE:
-                return new InsertIterator(connMgr, cur, this, EnlistOperation.UPSERT);
+                return new InsertIterator(cur, this, EnlistOperation.UPSERT);
             case INSERT:
-                return new InsertIterator(connMgr, cur, this, EnlistOperation.INSERT);
+                return new InsertIterator(cur, this, EnlistOperation.INSERT);
             case UPDATE:
-                return new UpdateIterator(connMgr, cur, this, EnlistOperation.UPDATE);
+                return new UpdateIterator(cur, this, EnlistOperation.UPDATE);
             case DELETE:
-                return new DeleteIterator(connMgr, cur, this, EnlistOperation.DELETE);
+                return new DeleteIterator( cur, this, EnlistOperation.DELETE);
 
             default:
                 throw new IllegalArgumentException(String.valueOf(mode));
@@ -607,9 +605,6 @@ public final class UpdatePlan {
     private abstract static class AbstractIterator extends GridCloseableIteratorAdapterEx<Object>
         implements UpdateSourceIterator<Object> {
         /** */
-        private final ConnectionManager connMgr;
-
-        /** */
         private final QueryCursor<List<?>> cur;
 
         /** */
@@ -621,18 +616,13 @@ public final class UpdatePlan {
         /** */
         private final EnlistOperation op;
 
-        /** */
-        private volatile ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn;
-
         /**
-         * @param connMgr Connection manager.
          * @param cur Query cursor.
          * @param plan Update plan.
          * @param op Operation.
          */
-        private AbstractIterator(ConnectionManager connMgr, QueryCursor<List<?>> cur, UpdatePlan plan,
+        private AbstractIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
             EnlistOperation op) {
-            this.connMgr = connMgr;
             this.cur = cur;
             this.plan = plan;
             this.op = op;
@@ -646,21 +636,8 @@ public final class UpdatePlan {
         }
 
         /** {@inheritDoc} */
-        @Override public void beforeDetach() {
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn0 = conn = connMgr.detachThreadConnection();
-
-            if (isClosed())
-                conn0.recycle();
-        }
-
-        /** {@inheritDoc} */
         @Override protected void onClose() {
             cur.close();
-
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable conn0 = conn;
-
-            if (conn0 != null)
-                conn0.recycle();
         }
 
         /** {@inheritDoc} */
@@ -683,14 +660,13 @@ public final class UpdatePlan {
         private static final long serialVersionUID = -4949035950470324961L;
 
         /**
-         * @param connMgr Connection manager.
          * @param cur Query cursor.
          * @param plan Update plan.
          * @param op Operation.
          */
-        private UpdateIterator(ConnectionManager connMgr, QueryCursor<List<?>> cur, UpdatePlan plan,
+        private UpdateIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
             EnlistOperation op) {
-            super(connMgr, cur, plan, op);
+            super(cur, plan, op);
         }
 
         /** {@inheritDoc} */
@@ -707,14 +683,13 @@ public final class UpdatePlan {
         private static final long serialVersionUID = -4949035950470324961L;
 
         /**
-         * @param connMgr Connection manager.
          * @param cur Query cursor.
          * @param plan Update plan.
          * @param op Operation.
          */
-        private DeleteIterator(ConnectionManager connMgr, QueryCursor<List<?>> cur, UpdatePlan plan,
+        private DeleteIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
             EnlistOperation op) {
-            super(connMgr, cur, plan, op);
+            super(cur, plan, op);
         }
 
         /** {@inheritDoc} */
@@ -729,14 +704,13 @@ public final class UpdatePlan {
         private static final long serialVersionUID = -4949035950470324961L;
 
         /**
-         * @param connMgr Connection manager.
          * @param cur Query cursor.
          * @param plan Update plan.
          * @param op Operation.
          */
-        private InsertIterator(ConnectionManager connMgr, QueryCursor<List<?>> cur, UpdatePlan plan,
+        private InsertIterator(QueryCursor<List<?>> cur, UpdatePlan plan,
             EnlistOperation op) {
-            super(connMgr, cur, plan, op);
+            super(cur, plan, op);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 707a58a..403081e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.h2.dml;
 
 import java.lang.reflect.Constructor;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -40,11 +39,14 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
+import org.apache.ignite.internal.processors.query.h2.H2StatementCache;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.QueryDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDelete;
@@ -902,9 +904,14 @@ public final class UpdatePlanBuilder {
         if ((!mvccEnabled && !planKey.skipReducerOnUpdate()) || planKey.batched())
             return null;
 
-        try (Connection conn = idx.connections().connectionNoCache(planKey.schemaName())) {
+        try (H2PooledConnection conn = idx.connections().connection(planKey.schemaName())) {
+            H2Utils.setupConnection(conn,
+                QueryContext.parseContext(idx.backupFilter(null, null), planKey.local()),
+                planKey.distributedJoins(),
+                planKey.enforceJoinOrder());
+
             // Get a new prepared statement for derived select query.
-            try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
+            try (PreparedStatement stmt = conn.prepareStatement(selectQry, H2StatementCache.queryFlags(planKey))) {
                 Prepared prep = GridSqlQueryParser.prepared(stmt);
                 GridSqlQuery selectStmt = (GridSqlQuery)new GridSqlQueryParser(false, log).parse(prep);
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index b10c490..ea13614 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -78,14 +78,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
+     * @param qctx Query context.
      * @return Index segment ID for current query context.
      */
-    protected int threadLocalSegment() {
+    protected int segment(QueryContext qctx) {
         if(segmentsCount() == 1)
             return 0;
 
-        QueryContext qctx = queryContextRegistry().getThreadLocal();
-
         if(qctx == null)
             throw new IllegalStateException("GridH2QueryContext is not initialized.");
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 8f07e19..7477544 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -1208,7 +1208,7 @@ public class GridH2Table extends TableBase {
 
     /** {@inheritDoc} */
     @Override public long getRowCountApproximation() {
-        if (!localQuery())
+        if (!localQuery(QueryContext.threadLocal()))
             return 10_000; // Fallback to the previous behaviour.
 
         refreshStatsIfNeeded();
@@ -1217,11 +1217,11 @@ public class GridH2Table extends TableBase {
     }
 
     /**
+     * @param qctx Context.
+     *
      * @return {@code True} if the current query is a local query.
      */
-    private boolean localQuery() {
-        QueryContext qctx = rowDescriptor().indexing().queryContextRegistry().getThreadLocal();
-
+    private boolean localQuery(QueryContext qctx) {
         assert qctx != null;
 
         return qctx.local();
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
index 24191ea..7e2628f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import java.util.Objects;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.query.h2.opt.join.DistributedJoinContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservation;
@@ -28,6 +29,12 @@ import org.jetbrains.annotations.Nullable;
  * Thread local SQL query context which is intended to be accessible from everywhere.
  */
 public class QueryContext {
+    /**
+     * Thread local query context is used for API that doesn't support h2 Session:
+     * distributed join and rowCount.
+     */
+    private static final ThreadLocal<QueryContext> qctxThreaded = new ThreadLocal<>();
+
     /** Segment ID. */
     private final int segment;
 
@@ -73,6 +80,21 @@ public class QueryContext {
     }
 
     /**
+     * @param filter Filter.
+     * @param local Local query flag.
+     * @return Context for parsing.
+     */
+    public static QueryContext parseContext(@Nullable IndexingQueryFilter filter, boolean local) {
+        return new QueryContext(
+            0,
+            filter,
+            null,
+            null,
+            null,
+            local);
+    }
+
+    /**
      * @return Mvcc snapshot.
      */
     @Nullable public MvccSnapshot mvccSnapshot() {
@@ -123,4 +145,28 @@ public class QueryContext {
     @Override public String toString() {
         return S.toString(QueryContext.class, this);
     }
+
+    /**
+     * Hack with thread local context is used only for H2 methods that is called without Session object.
+     *  e.g. GridH2Table.getRowCountApproximation (used only on optimization phase, after parse).
+     *
+     * @param qctx Context.
+     */
+    public static void threadLocal(QueryContext qctx) {
+        qctxThreaded.set(qctx);
+    }
+
+    /**
+     * Hack with thread local context is used only for H2 methods that is called without Session object.
+     *  e.g. GridH2Table.getRowCountApproximation (used only on optimization phase, after parse).
+     *
+     * @return Thread local context.
+     */
+    public static QueryContext threadLocal() {
+        QueryContext qctx = qctxThreaded.get();
+
+        assert Objects.nonNull(qctx);
+
+        return qctx;
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
index 34acbbf..c53b5ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/QueryContextRegistry.java
@@ -17,51 +17,19 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import org.jetbrains.annotations.Nullable;
-
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Registry of all currently available query contexts.
  */
 public class QueryContextRegistry {
-    /** Current local context. */
-    private final ThreadLocal<QueryContext> locCtx = new ThreadLocal<>();
-
     /** Shared contexts. */
     private final ConcurrentMap<QueryContextKey, QueryContext> sharedCtxs = new ConcurrentHashMap<>();
 
     /**
-     * Access current thread local query context (if it was set).
-     *
-     * @return Current thread local query context or {@code null} if the query runs outside of Ignite context.
-     */
-    @Nullable public QueryContext getThreadLocal() {
-        return locCtx.get();
-    }
-
-    /**
-     * Sets current thread local context. This method must be called when all the non-volatile properties are
-     * already set to ensure visibility for other threads.
-     *
-     * @param x Query context.
-     */
-    public void setThreadLocal(QueryContext x) {
-        assert locCtx.get() == null;
-
-        locCtx.set(x);
-    }
-
-    /**
-     * Drops current thread local context.
-     */
-    public void clearThreadLocal() {
-        locCtx.remove();
-    }
-
-    /**
      * Access query context from another thread.
      *
      * @param nodeId The node who initiated the query.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
index 7599516..5dbb38c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/join/DistributedLookupBatch.java
@@ -17,13 +17,20 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt.join;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Future;
+import javax.cache.CacheException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
@@ -36,15 +43,6 @@ import org.h2.util.DoneFuture;
 import org.h2.value.Value;
 import org.h2.value.ValueNull;
 
-import javax.cache.CacheException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Future;
-
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor.COL_NOT_EXISTS;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 
@@ -61,9 +59,6 @@ public class DistributedLookupBatch implements IndexLookupBatch {
     /** */
     private final GridCacheContext<?,?> cctx;
 
-    /** Query context registry. */
-    private final QueryContextRegistry qryCtxRegistry;
-
     /** */
     private final boolean ucast;
 
@@ -96,11 +91,10 @@ public class DistributedLookupBatch implements IndexLookupBatch {
      * @param ucast Unicast or broadcast query.
      * @param affColId Affinity column ID.
      */
-    public DistributedLookupBatch(H2TreeIndex idx, GridCacheContext<?, ?> cctx, QueryContextRegistry qryCtxRegistry,
+    public DistributedLookupBatch(H2TreeIndex idx, GridCacheContext<?, ?> cctx,
         boolean ucast, int affColId) {
         this.idx = idx;
         this.cctx = cctx;
-        this.qryCtxRegistry = qryCtxRegistry;
         this.ucast = ucast;
         this.affColId = affColId;
     }
@@ -155,7 +149,7 @@ public class DistributedLookupBatch implements IndexLookupBatch {
             if (joinCtx == null) {
                 // It is the first call after query begin (may be after reuse),
                 // reinitialize query context and result.
-                QueryContext qctx = qryCtxRegistry.getThreadLocal();
+                QueryContext qctx = QueryContext.threadLocal();
 
                 res = new ArrayList<>();
 
@@ -275,9 +269,8 @@ public class DistributedLookupBatch implements IndexLookupBatch {
         joinCtx.putStreams(batchLookupId, rangeStreams);
 
         // Start streaming.
-        for (RangeStream stream : rangeStreams.values()) {
+        for (RangeStream stream : rangeStreams.values())
             stream.start();
-        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 93eed77..b91ede3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.sql;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -40,10 +39,13 @@ import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
+import org.apache.ignite.internal.processors.query.h2.H2StatementCache;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.affinity.PartitionExtractor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.command.Prepared;
@@ -194,7 +196,7 @@ public class GridSqlQuerySplitter {
      * @throws IgniteCheckedException If failed.
      */
     public static GridCacheTwoStepQuery split(
-        Connection conn,
+        H2PooledConnection conn,
         GridSqlQuery qry,
         String originalSql,
         boolean collocatedGrpBy,
@@ -241,7 +243,7 @@ public class GridSqlQuerySplitter {
      * @throws IgniteCheckedException If failed.
      */
     private static GridCacheTwoStepQuery split0(
-        Connection conn,
+        H2PooledConnection conn,
         GridSqlQuery qry,
         String originalSql,
         boolean collocatedGrpBy,
@@ -272,7 +274,7 @@ public class GridSqlQuerySplitter {
         // The distributedJoins parameter is ignored because it is not relevant for
         // the REDUCE query optimization.
         qry = GridSqlQueryParser.parseQuery(
-            prepare(conn, qry.getSQL(), false, enforceJoinOrder),
+            prepare(conn, H2Utils.context(conn), qry.getSQL(), false, enforceJoinOrder),
             true, log);
 
         // Do the actual query split. We will update the original query AST, need to be careful.
@@ -288,7 +290,12 @@ public class GridSqlQuerySplitter {
             boolean allCollocated = true;
 
             for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) {
-                Prepared prepared0 = prepare(conn, mapSqlQry.query(), true, enforceJoinOrder);
+                Prepared prepared0 = prepare(
+                    conn,
+                    H2Utils.context(conn),
+                    mapSqlQry.query(),
+                    true,
+                    enforceJoinOrder);
 
                 allCollocated &= isCollocated((Query)prepared0);
 
@@ -1771,11 +1778,11 @@ public class GridSqlQuerySplitter {
      * @return Optimized prepared command.
      * @throws SQLException If failed.
      */
-    public static Prepared prepare(Connection c, String qry, boolean distributedJoins,
-        boolean enforceJoinOrder) throws SQLException {
-        H2Utils.setupConnection(c, distributedJoins, enforceJoinOrder);
+    private static Prepared prepare(H2PooledConnection c, QueryContext qctx, String qry, boolean distributedJoins,
+        boolean enforceJoinOrder) throws SQLException, IgniteCheckedException {
+        H2Utils.setupConnection(c, qctx, distributedJoins, enforceJoinOrder);
 
-        try (PreparedStatement s = c.prepareStatement(qry)) {
+        try (PreparedStatement s = c.prepareStatement(qry, H2StatementCache.queryFlags(distributedJoins, enforceJoinOrder))) {
             return GridSqlQueryParser.prepared(s);
         }
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 8b2c157..5996762 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -49,7 +49,8 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
+import org.apache.ignite.internal.processors.query.h2.H2StatementCache;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.MapH2QueryInfo;
@@ -72,7 +73,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.api.ErrorCode;
 import org.h2.jdbc.JdbcResultSet;
-import org.h2.jdbc.JdbcSQLException;
 import org.h2.value.Value;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -307,6 +307,8 @@ public class GridMapQueryExecutor {
 
         PartitionReservation reserved = null;
 
+        QueryContext qctx = null;
+
         try {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
@@ -339,22 +341,19 @@ public class GridMapQueryExecutor {
                 );
             }
 
-            QueryContext qctx = new QueryContext(
+            qctx = new QueryContext(
                 segmentId,
                 h2.backupFilter(topVer, parts),
                 distributedJoinCtx,
                 mvccSnapshot,
                 reserved,
-                true
-            );
+                true);
 
             qryResults = new MapQueryResults(h2, reqId, qrys.size(), mainCctx, lazy, qctx);
 
             // qctx is set, we have to release reservations inside of it.
             reserved = null;
 
-            qryCtxRegistry.setThreadLocal(qctx);
-
             if (distributedJoinCtx != null)
                 qryCtxRegistry.setShared(node.id(), reqId, qctx);
 
@@ -375,16 +374,17 @@ public class GridMapQueryExecutor {
             boolean evt = mainCctx != null && mainCctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED);
 
             for (GridCacheSqlQuery qry : qrys) {
-                H2ConnectionWrapper connWrp = h2.connections().connectionForThread();
+                H2PooledConnection conn = h2.connections().connection(schemaName);
 
                 H2Utils.setupConnection(
-                    connWrp.connection(schemaName),
+                    conn,
+                    qctx,
                     distributedJoins,
                     enforceJoinOrder,
                     lazy
                 );
 
-                MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, connWrp, log);
+                MapQueryResult res = new MapQueryResult(h2, mainCctx, node.id(), qry, params, conn, log);
 
                 qryResults.addResult(qryIdx, res);
 
@@ -396,14 +396,9 @@ public class GridMapQueryExecutor {
                         String sql = qry.query();
                         Collection<Object> params0 = F.asList(qry.parameters(params));
 
-                        PreparedStatement stmt;
-
-                        try {
-                            stmt = h2.connections().prepareStatement(connWrp.connection(), sql);
-                        }
-                        catch (SQLException e) {
-                            throw new IgniteCheckedException("Failed to parse SQL query: " + sql, e);
-                        }
+                        PreparedStatement stmt = conn.prepareStatement(sql, H2StatementCache.queryFlags(
+                            distributedJoins,
+                            enforceJoinOrder));
 
                         H2Utils.bindParameters(stmt, params0);
 
@@ -411,9 +406,8 @@ public class GridMapQueryExecutor {
 
                         ResultSet rs = h2.executeSqlQueryWithTimer(
                             stmt,
-                            connWrp.connection(),
+                            conn,
                             sql,
-                            params0,
                             timeout,
                             qryResults.queryCancel(qryIdx),
                             dataPageScanEnabled,
@@ -486,12 +480,12 @@ public class GridMapQueryExecutor {
                 qryResults.close();
             }
             else
-                releaseReservations();
+                releaseReservations(qctx);
 
             if (e instanceof QueryCancelledException)
                 sendError(node, reqId, e);
             else {
-                JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+                SQLException sqlEx = X.cause(e, SQLException.class);
 
                 if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
                     sendQueryCancel(node, reqId);
@@ -526,8 +520,6 @@ public class GridMapQueryExecutor {
         finally {
             if (reserved != null)
                 reserved.release();
-
-            qryCtxRegistry.clearThreadLocal();
         }
     }
 
@@ -541,13 +533,11 @@ public class GridMapQueryExecutor {
 
     /**
      * Releases reserved partitions.
+     *
+     * @param qctx Query context.
      */
-    private void releaseReservations() {
-        QueryContext qctx = qryCtxRegistry.getThreadLocal();
-
-        if (qctx != null) { // No-op if already released.
-            qryCtxRegistry.clearThreadLocal();
-
+    private void releaseReservations(QueryContext qctx) {
+        if (qctx != null) {
             if (qctx.distributedJoinContext() == null)
                 qctx.clearContext(false);
         }
@@ -754,13 +744,6 @@ public class GridMapQueryExecutor {
             sendQueryCancel(node, reqId);
         else {
             try {
-                QueryContext qctxReduce = qryCtxRegistry.getThreadLocal();
-
-                if (qctxReduce != null)
-                    qryCtxRegistry.clearThreadLocal();
-
-                qryCtxRegistry.setThreadLocal(qryResults.queryContext());
-
                 MapQueryResult res = qryResults.result(req.query());
 
                 assert res != null;
@@ -787,11 +770,6 @@ public class GridMapQueryExecutor {
                         sendNextPage(node, msg);
                 }
                 finally {
-                    qryCtxRegistry.clearThreadLocal();
-
-                    if (qctxReduce != null)
-                        qryCtxRegistry.setThreadLocal(qctxReduce);
-
                     try {
                         res.unlockTables();
                     }
@@ -806,7 +784,7 @@ public class GridMapQueryExecutor {
                 if (retryEx != null)
                     sendError(node, reqId, retryEx);
                 else {
-                    JdbcSQLException sqlEx = X.cause(e, JdbcSQLException.class);
+                    SQLException sqlEx = X.cause(e, SQLException.class);
 
                     if (sqlEx != null && sqlEx.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
                         sendQueryCancel(node, reqId);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 75db738..b719a2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -58,16 +57,14 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
 import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.ReduceH2QueryInfo;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedUpdateRun;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
@@ -90,7 +87,6 @@ import org.apache.ignite.transactions.TransactionException;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
 import org.h2.index.Index;
-import org.h2.jdbc.JdbcConnection;
 import org.h2.table.Column;
 import org.h2.util.IntArray;
 import org.h2.value.Value;
@@ -407,180 +403,170 @@ public class GridReduceQueryExecutor {
 
             assert !F.isEmpty(nodes);
 
+            H2PooledConnection conn = h2.connections().connection(schemaName);
+
             final long qryReqId = qryIdGen.incrementAndGet();
 
             boolean retry = false;
             boolean release = true;
 
-            final ReduceQueryRun r = createReduceQueryRun(
-                (JdbcConnection)h2.connections().connectionForThread().connection(schemaName),
-                mapQueries, nodes, pageSize, segmentsPerIndex, skipMergeTbl, qry.explain(), dataPageScanEnabled);
+            try {
+                final ReduceQueryRun r = createReduceQueryRun(conn, mapQueries, nodes,
+                    pageSize, segmentsPerIndex, skipMergeTbl, qry.explain(), dataPageScanEnabled);
 
-            runs.put(qryReqId, r);
+                runs.put(qryReqId, r);
 
-            ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn = h2.connections().detachThreadConnection();
+                try {
+                    cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));
 
-            try {
-                cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));
-
-                GridH2QueryRequest req = new GridH2QueryRequest()
-                    .requestId(qryReqId)
-                    .topologyVersion(topVer)
-                    .pageSize(pageSize)
-                    .caches(qry.cacheIds())
-                    .tables(qry.distributedJoins() ? qry.tables() : null)
-                    .partitions(convert(mapping.partitionsMap()))
-                    .queries(mapQueries)
-                    .parameters(params)
-                    .flags(queryFlags(qry, enforceJoinOrder || !singlePartMode, lazy, dataPageScanEnabled))
-                    .timeout(timeoutMillis)
-                    .schemaName(schemaName);
-
-                if (mvccTracker != null)
-                    req.mvccSnapshot(mvccTracker.snapshot());
-
-                final C2<ClusterNode, Message, Message> spec =
-                    parts == null ? null : new ReducePartitionsSpecializer(mapping.queryPartitionsMap());
-
-                if (send(nodes, req, spec, false)) {
-                    awaitAllReplies(r, nodes, cancel);
+                    GridH2QueryRequest req = new GridH2QueryRequest()
+                        .requestId(qryReqId)
+                        .topologyVersion(topVer)
+                        .pageSize(pageSize)
+                        .caches(qry.cacheIds())
+                        .tables(qry.distributedJoins() ? qry.tables() : null)
+                        .partitions(convert(mapping.partitionsMap()))
+                        .queries(mapQueries)
+                        .parameters(params)
+                        .flags(queryFlags(qry, enforceJoinOrder, lazy, dataPageScanEnabled))
+                        .timeout(timeoutMillis)
+                        .schemaName(schemaName);
 
-                    if (r.hasErrorOrRetry()) {
-                        CacheException err = r.exception();
+                    if (mvccTracker != null)
+                        req.mvccSnapshot(mvccTracker.snapshot());
 
-                        if (err != null) {
-                            if (err.getCause() instanceof IgniteClientDisconnectedException)
-                                throw err;
+                    final C2<ClusterNode, Message, Message> spec =
+                        parts == null ? null : new ReducePartitionsSpecializer(mapping.queryPartitionsMap());
 
-                            if (QueryUtils.wasCancelled(err))
-                                throw new QueryCancelledException(); // Throw correct exception.
+                    if (send(nodes, req, spec, false)) {
+                        awaitAllReplies(r, nodes, cancel);
 
-                            throw err;
-                        }
-                        else {
-                            retry = true;
+                        if (r.hasErrorOrRetry()) {
+                            CacheException err = r.exception();
 
-                            // If remote node asks us to retry then we have outdated full partition map.
-                            h2.awaitForReadyTopologyVersion(r.retryTopologyVersion());
-                        }
-                    }
-                }
-                else // Send failed.
-                    retry = true;
+                            if (err != null) {
+                                if (err.getCause() instanceof IgniteClientDisconnectedException)
+                                    throw err;
 
-                if (retry) {
-                    lastRun = runs.remove(qryReqId);
-                    assert lastRun != null;
+                                if (QueryUtils.wasCancelled(err))
+                                    throw new QueryCancelledException(); // Throw correct exception.
 
-                    continue;
-                }
-                else {
-                    Iterator<List<?>> resIter;
+                                throw err;
+                            }
+                            else {
+                                retry = true;
+
+                                // If remote node asks us to retry then we have outdated full partition map.
+                                h2.awaitForReadyTopologyVersion(r.retryTopologyVersion());
+                            }
+                        }
+                    }
+                    else // Send failed.
+                        retry = true;
 
-                    if (skipMergeTbl) {
-                        resIter = new ReduceIndexIterator(this,
-                            nodes,
-                            r,
-                            qryReqId,
-                            qry.distributedJoins(),
-                            mvccTracker);
+                    if (retry) {
+                        lastRun = runs.remove(qryReqId);
+                        assert lastRun != null;
 
-                        release = false;
+                        continue;
                     }
                     else {
-                        ensureQueryNotCancelled(cancel);
-
-                        QueryContext qctx = new QueryContext(
-                            0,
-                            null,
-                            null,
-                            null,
-                            null,
-                            true
-                        );
+                        Iterator<List<?>> resIter;
 
-                        H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
+                        if (skipMergeTbl) {
+                            resIter = new ReduceIndexIterator(this,
+                                nodes,
+                                r,
+                                qryReqId,
+                                qry.distributedJoins(),
+                                mvccTracker);
 
-                        QueryContextRegistry qryCtxRegistry = h2.queryContextRegistry();
+                            release = false;
 
-                        qryCtxRegistry.setThreadLocal(qctx);
+                            U.close(conn, log);
+                        }
+                        else {
+                            ensureQueryNotCancelled(cancel);
+
+                            QueryContext qctx = new QueryContext(
+                                0,
+                                null,
+                                null,
+                                null,
+                                null,
+                                true
+                            );
+
+                            H2Utils.setupConnection(conn, qctx, false, enforceJoinOrder);
 
-                        try {
                             if (qry.explain())
-                                return explainPlan(r.connection(), qry, params);
+                                return explainPlan(conn, qry, params);
 
                             GridCacheSqlQuery rdc = qry.reduceQuery();
 
-                            Collection<Object> params0 = F.asList(rdc.parameters(params));
+                            final PreparedStatement stmt = conn.prepareStatementNoCache(rdc.query());
 
-                            final PreparedStatement stmt = h2.preparedStatementWithParams(r.connection(), rdc.query(),
-                                params0, false);
+                            H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params)));
 
                             ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), qryReqId);
 
-                            ResultSet res = h2.executeSqlQueryWithTimer(stmt, r.connection(),
+                            ResultSet res = h2.executeSqlQueryWithTimer(stmt,
+                                conn,
                                 rdc.query(),
-                                F.asList(rdc.parameters(params)),
                                 timeoutMillis,
                                 cancel,
                                 dataPageScanEnabled,
                                 qryInfo
                             );
 
-                            resIter = new H2FieldsIterator(res, mvccTracker, detachedConn, r.pageSize(), log, h2,
-                                qctx, lazy);
+                            resIter = new H2FieldsIterator(res, mvccTracker, conn, r.pageSize(), log, h2);
 
-                            // don't recycle at final block
-                            detachedConn = null;
+                            conn = null;
 
                             mvccTracker = null; // To prevent callback inside finally block;
                         }
-                        finally {
-                            qryCtxRegistry.clearThreadLocal();
-                        }
-                    }
 
-                    return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
+                        return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary);
+                    }
                 }
-            }
-            catch (IgniteCheckedException | RuntimeException e) {
-                release = true;
+                catch (IgniteCheckedException | RuntimeException e) {
+                    release = true;
 
-                U.closeQuiet(r.connection());
+                    if (e instanceof CacheException) {
+                        if (QueryUtils.wasCancelled(e))
+                            throw new CacheException("Failed to run reduce query locally.",
+                                new QueryCancelledException());
 
-                if (e instanceof CacheException) {
-                    if (QueryUtils.wasCancelled(e))
-                        throw new CacheException("Failed to run reduce query locally.",
-                            new QueryCancelledException());
+                        throw (CacheException)e;
+                    }
 
-                    throw (CacheException)e;
-                }
+                    Throwable cause = e;
 
-                Throwable cause = e;
+                    if (e instanceof IgniteCheckedException) {
+                        Throwable disconnectedErr =
+                            ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
 
-                if (e instanceof IgniteCheckedException) {
-                    Throwable disconnectedErr =
-                        ((IgniteCheckedException)e).getCause(IgniteClientDisconnectedException.class);
+                        if (disconnectedErr != null)
+                            cause = disconnectedErr;
+                    }
 
-                    if (disconnectedErr != null)
-                        cause = disconnectedErr;
+                    throw new CacheException("Failed to run reduce query locally. " + cause.getMessage(), cause);
                 }
+                finally {
+                    if (release) {
+                        releaseRemoteResources(nodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
 
-                throw new CacheException("Failed to run reduce query locally. " + cause.getMessage(), cause);
-            }
-            finally {
-                if (detachedConn != null)
-                    detachedConn.recycle();
-
-                if (release) {
-                    releaseRemoteResources(nodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
-
-                    if (!skipMergeTbl) {
-                        for (int i = 0, mapQrys = mapQueries.size(); i < mapQrys; i++)
-                            fakeTable(null, i).innerTable(null); // Drop all merge tables.
+                        if (!skipMergeTbl) {
+                            for (int i = 0, mapQrys = mapQueries.size(); i < mapQrys; i++)
+                                fakeTable(null, i).innerTable(null); // Drop all merge tables.
+                        }
                     }
                 }
             }
+            finally {
+                if (conn != null && (retry || release))
+                    U.close(conn, log);
+            }
         }
     }
 
@@ -690,7 +676,7 @@ public class GridReduceQueryExecutor {
      * @return Reduce query run.
      */
     @NotNull private ReduceQueryRun createReduceQueryRun(
-        JdbcConnection conn,
+        H2PooledConnection conn,
         List<GridCacheSqlQuery> mapQueries,
         Collection<ClusterNode> nodes,
         int pageSize,
@@ -700,7 +686,6 @@ public class GridReduceQueryExecutor {
         Boolean dataPageScanEnabled) {
 
         final ReduceQueryRun r = new ReduceQueryRun(
-            conn,
             mapQueries.size(),
             pageSize,
             dataPageScanEnabled
@@ -1019,7 +1004,7 @@ public class GridReduceQueryExecutor {
      * @param idx Index of table.
      * @return Table.
      */
-    private ReduceTableWrapper fakeTable(Connection c, int idx) {
+    private ReduceTableWrapper fakeTable(H2PooledConnection c, int idx) {
         List<ReduceTableWrapper> tbls = fakeTbls;
 
         assert tbls.size() >= idx;
@@ -1029,7 +1014,7 @@ public class GridReduceQueryExecutor {
 
             try {
                 if ((tbls = fakeTbls).size() == idx) { // Double check inside of lock.
-                    ReduceTableWrapper tbl = ReduceTableEngine.create(c, idx);
+                    ReduceTableWrapper tbl = ReduceTableEngine.create(c.connection(), idx);
 
                     List<ReduceTableWrapper> newTbls = new ArrayList<>(tbls.size() + 1);
 
@@ -1054,13 +1039,20 @@ public class GridReduceQueryExecutor {
      * @return Cursor for plans.
      * @throws IgniteCheckedException if failed.
      */
-    private Iterator<List<?>> explainPlan(JdbcConnection c, GridCacheTwoStepQuery qry, Object[] params)
+    private Iterator<List<?>> explainPlan(H2PooledConnection c, GridCacheTwoStepQuery qry, Object[] params)
         throws IgniteCheckedException {
         List<List<?>> lists = new ArrayList<>(qry.mapQueries().size() + 1);
 
         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) {
             ResultSet rs =
-                h2.executeSqlQueryWithTimer(c, "SELECT PLAN FROM " + mergeTableIdentifier(i), null, 0, null, null, null);
+                h2.executeSqlQueryWithTimer(
+                    c,
+                    "SELECT PLAN FROM " + mergeTableIdentifier(i),
+                    null,
+                    0,
+                    null,
+                    null,
+                    null);
 
             lists.add(F.asList(getPlan(rs)));
         }
@@ -1102,6 +1094,9 @@ public class GridReduceQueryExecutor {
         catch (SQLException e) {
             throw new IgniteCheckedException(e);
         }
+        finally {
+            U.closeQuiet(rs);
+        }
     }
 
     /**
@@ -1167,10 +1162,10 @@ public class GridReduceQueryExecutor {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private ReduceTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
+    private ReduceTable createMergeTable(H2PooledConnection conn, GridCacheSqlQuery qry, boolean explain)
         throws IgniteCheckedException {
         try {
-            Session ses = (Session)conn.getSession();
+            Session ses = H2Utils.session(conn);
 
             CreateTableData data  = new CreateTableData();
 
@@ -1244,8 +1239,6 @@ public class GridReduceQueryExecutor {
             return tbl;
         }
         catch (Exception e) {
-            U.closeQuiet(conn);
-
             throw new IgniteCheckedException(e);
         }
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index cd6a844..15286d6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -30,12 +30,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.ThreadLocalObjectPool;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.engine.Session;
@@ -103,7 +103,7 @@ class MapQueryResult {
     private final Session ses;
 
     /** Detached connection. Used for lazy execution to prevent connection sharing. */
-    private ThreadLocalObjectPool<H2ConnectionWrapper>.Reusable detachedConn;
+    private H2PooledConnection conn;
 
     /** */
     private final ReentrantLock lock = new ReentrantLock();
@@ -118,7 +118,7 @@ class MapQueryResult {
      * @param log Logger.
      */
     MapQueryResult(IgniteH2Indexing h2, @Nullable GridCacheContext cctx,
-        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, H2ConnectionWrapper conn, IgniteLogger log) {
+        UUID qrySrcNodeId, GridCacheSqlQuery qry, Object[] params, H2PooledConnection conn, IgniteLogger log) {
         this.h2 = h2;
         this.cctx = cctx;
         this.qry = qry;
@@ -126,6 +126,7 @@ class MapQueryResult {
         this.qrySrcNodeId = qrySrcNodeId;
         this.cpNeeded = F.eq(h2.kernalContext().localNodeId(), qrySrcNodeId);
         this.log = log;
+        this.conn = conn;
 
         ses = H2Utils.session(conn.connection());
     }
@@ -183,6 +184,8 @@ class MapQueryResult {
 
         boolean readEvt = cctx != null && cctx.name() != null && cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
 
+        QueryContext.threadLocal(H2Utils.context(ses));
+
         page++;
 
         h2.enableDataPageScan(dataPageScanEnabled);
@@ -244,9 +247,6 @@ class MapQueryResult {
                 rows.add(res.res.currentRow());
             }
 
-            if (detachedConn == null && res.res.hasNext())
-                detachedConn = h2.connections().detachThreadConnection();
-
             return !res.res.hasNext();
         }
         finally {
@@ -281,10 +281,9 @@ class MapQueryResult {
         if (res != null)
             res.close();
 
-        if (detachedConn != null)
-            detachedConn.recycle();
+        H2Utils.resetSession(conn);
 
-        detachedConn = null;
+        conn.close();
     }
 
     /** */
@@ -295,7 +294,7 @@ class MapQueryResult {
 
     /** */
     public void lockTables() {
-        if (ses.isLazyQueryExecution() && !closed)
+        if (!closed && ses.isLazyQueryExecution())
             GridH2Table.readLockTables(ses);
     }
 
@@ -307,7 +306,7 @@ class MapQueryResult {
 
     /** */
     public void unlockTables() {
-        if (ses.isLazyQueryExecution())
+        if (!closed && ses.isLazyQueryExecution())
             GridH2Table.unlockTables(ses);
     }
 
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
index 6ef3d1b..daa5536 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResults.java
@@ -214,18 +214,9 @@ class MapQueryResults {
     }
 
     /**
-     * @return Query context.
-     */
-    public QueryContext queryContext() {
-        return qctx;
-    }
-
-    /**
      * Release query context.
      */
     public void releaseQueryContext() {
-        h2.queryContextRegistry().clearThreadLocal();
-
         if (qctx.distributedJoinContext() == null)
             qctx.clearContext(false);
     }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index 1468eb3..df916a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.h2.jdbc.JdbcConnection;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -42,9 +41,6 @@ public class ReduceQueryRun {
     private CountDownLatch latch;
 
     /** */
-    private final JdbcConnection conn;
-
-    /** */
     private final int pageSize;
 
     /** */
@@ -55,21 +51,17 @@ public class ReduceQueryRun {
 
     /**
      * Constructor.
-     * @param conn Connection.
      * @param idxsCnt Number of indexes.
      * @param pageSize Page size.
      * @param dataPageScanEnabled If data page scan is enabled.
      */
     ReduceQueryRun(
-        JdbcConnection conn,
         int idxsCnt,
         int pageSize,
         Boolean dataPageScanEnabled
     ) {
         assert pageSize > 0;
 
-        this.conn = conn;
-
         idxs = new ArrayList<>(idxsCnt);
 
         this.pageSize = pageSize;
@@ -146,13 +138,6 @@ public class ReduceQueryRun {
         return pageSize;
     }
 
-    /**
-     * @return Connection.
-     */
-    JdbcConnection connection() {
-        return conn;
-    }
-
     /** */
     boolean hasErrorOrRetry(){
         return state.get() != null;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
index bea87f0..663e34d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryH2IndexingLeakTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCache;
@@ -28,7 +26,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.processors.query.h2.H2ConnectionWrapper;
+import org.apache.ignite.internal.processors.query.h2.ConcurrentStripedPool;
+import org.apache.ignite.internal.processors.query.h2.H2Connection;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.CAX;
@@ -123,12 +122,9 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
     private static int getStatementCacheSize(GridQueryProcessor qryProcessor) {
         IgniteH2Indexing h2Idx = (IgniteH2Indexing)qryProcessor.getIndexing();
 
-        Map<Thread, ConcurrentMap<H2ConnectionWrapper, Boolean>> conns = h2Idx.connections().connectionsForThread();
+        ConcurrentStripedPool<H2Connection> conns = GridTestUtils.getFieldValue(h2Idx.connections(), "connPool");
 
-        return conns.values().stream()
-            .mapToInt(set ->
-                set.keySet().stream()
-                    .mapToInt(H2ConnectionWrapper::statementCacheSize).sum()).sum();
+        return conns.stream().mapToInt(H2Connection::statementCacheSize).sum();
     }
 
     /**
@@ -167,7 +163,7 @@ public class IgniteCacheQueryH2IndexingLeakTest extends GridCommonAbstractTest {
                         // is out there, are terminated and their statement caches are cleaned up.
                         return getStatementCacheSize(qryProc) >= THREAD_COUNT;
                     }
-                }, STMT_CACHE_CLEANUP_TIMEOUT));
+                }, STMT_CACHE_CLEANUP_TIMEOUT * 4));
             }
             finally {
                 stop.set(true);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
index 19f4481..1fb74f5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.cache.index;
 
-import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.h2.ConnectionManager;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -64,6 +66,9 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
 
         cfg.setCacheConfiguration(ccfg);
 
+        if (getTestIgniteInstanceIndex(igniteInstanceName) != 0)
+            cfg.setClientMode(true);
+
         return cfg;
     }
 
@@ -165,18 +170,33 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
     }
 
     /**
+     * @throws Exception On failed.
+     */
+    @Test
+    public void testExplainLeak() throws Exception {
+        startGridAndPopulateCache(NODE_CNT);
+
+        final IgniteCache cache = grid(0).cache(CACHE_NAME);
+
+        for (int i = 0; i < ITERS; ++i) {
+            GridTestUtils.runMultiThreaded(() -> {
+                cache.query(new SqlFieldsQuery("explain select * from String")).getAll();
+
+            }, 10, "explain-threads");
+
+            checkConnectionLeaks();
+        }
+    }
+
+    /**
      * @throws Exception On error.
      */
     private void checkConnectionLeaks() throws Exception {
         boolean notLeak = GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 for (int i = 0; i < NODE_CNT; i++) {
-                    Map<Thread, ?> conns = perThreadConnections(i);
-
-                    for(Thread t : conns.keySet()) {
-                        if (!t.isAlive())
-                            return false;
-                    }
+                    if (!usedConnections(i).isEmpty())
+                        return false;
                 }
 
                 return true;
@@ -185,10 +205,10 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
 
         if (!notLeak) {
             for (int i = 0; i < NODE_CNT; i++) {
-                Map<Thread, ?> conns = perThreadConnections(i);
+                Set<H2PooledConnection> usedConns = usedConnections(i);
 
-                for(Thread t : conns.keySet())
-                    log.error("Connection is not closed for thread: " + t.getName());
+                if (!usedConnections(i).isEmpty())
+                    log.error("Not closed connections: " + usedConns);
             }
 
             fail("H2 JDBC connections leak detected. See the log above.");
@@ -196,11 +216,13 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
     }
 
     /**
-     * @param nodeIdx Node index.
-     * @return Per-thread connections.
+     * @param i Node index.
+     * @return Set of used connections.
      */
-    private Map<Thread, ?> perThreadConnections(int nodeIdx) {
-        return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).connections().connectionsForThread();
+    private Set<H2PooledConnection> usedConnections(int i) {
+        ConnectionManager connMgr = ((IgniteH2Indexing)grid(i).context().query().getIndexing()).connections();
+
+        return  GridTestUtils.getFieldValue(connMgr, "usedConns");
     }
 
     /**
@@ -208,13 +230,11 @@ public class H2ConnectionLeaksSelfTest extends AbstractIndexingCommonTest {
      * @throws Exception On error.
      */
     private void startGridAndPopulateCache(int nodes) throws Exception {
-        startGrid(0);
-        startClientGridsMultiThreaded(1, NODE_CNT - 1);
+        startGrids(NODE_CNT);
 
         IgniteCache<Long, String> cache = grid(0).cache(CACHE_NAME);
 
         for (int i = 0; i < KEY_CNT; i++)
             cache.put((long)i, String.valueOf(i));
-
     }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
index a64cb5c..08dad12 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/AbstractQueryTableLockAndConnectionPoolSelfTest.java
@@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.query;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryRetryException;
@@ -33,6 +35,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.query.h2.ConnectionManager;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -54,6 +60,9 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
     /** Test duration. */
     private static final long TEST_DUR = GridTestUtils.SF.applyLB(10_000, 3_000);
 
+    /** Run query local . */
+    private static boolean local = false;
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         stopAllGrids();
@@ -116,6 +125,29 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
     }
 
     /**
+     * Test DDL operation on table with high load local queries.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSingleNodeTablesLockQueryLocalAndDDLMultithreaded() throws Exception {
+        local = true;
+
+        try {
+            final Ignite srv = startGrid(0);
+
+            populateBaseQueryData(srv, 1);
+
+            checkTablesLockQueryAndDDLMultithreaded(srv);
+
+            checkTablesLockQueryAndDropColumnMultithreaded(srv);
+        }
+        finally {
+            local = false;
+        }
+    }
+
+    /**
      * Test DDL operation on table with high load queries.
      *
      * @throws Exception If failed.
@@ -142,7 +174,16 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
         Ignite srv1 = startGrid(1);
         startGrid(2);
 
-        Ignite cli = startClientGrid(3);
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
 
         populateBaseQueryData(srv0, 1);
 
@@ -167,7 +208,16 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
         Ignite srv1 = startGrid(1);
         startGrid(2);
 
-        Ignite cli = startClientGrid(3);
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(3);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
 
         populateBaseQueryData(srv0, 4);
 
@@ -253,14 +303,15 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
             @Override public void run() {
                 while(!end.get()) {
                     try {
-                        FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery(
+                        FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQueryEx(
                             "SELECT pers.id, pers.name " +
                             "FROM (SELECT DISTINCT p.id, p.name " +
                             "FROM \"pers\".PERSON as p) as pers " +
                             "JOIN \"pers\".PERSON p on p.id = pers.id " +
                             "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " +
-                            "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id")
+                            "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id", true)
                             .setLazy(lazy())
+                            .setLocal(local)
                             .setPageSize(PAGE_SIZE_SMALL));
 
                         cursor.getAll();
@@ -291,6 +342,8 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
         // Test is OK in case DDL operations is passed on hi load queries pressure.
         end.set(true);
         fut.get();
+
+        checkConnectionLeaks(Ignition.allGrids().size());
     }
 
     /**
@@ -343,6 +396,8 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
         // Test is OK in case DDL operations is passed on hi load queries pressure.
         end.set(true);
         fut.get();
+
+        checkConnectionLeaks(Ignition.allGrids().size());
     }
 
     /**
@@ -374,7 +429,7 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
      * @throws Exception If failed.
      */
     public void checkSingleNode(int parallelism) throws Exception {
-        Ignite srv = startGrid();
+        Ignite srv = startGrid(0);
 
         populateBaseQueryData(srv, parallelism);
 
@@ -388,9 +443,19 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
      * @throws Exception If failed.
      */
     public void checkMultipleNodes(int parallelism) throws Exception {
-        Ignite srv1 = startGrid(1);
-        Ignite srv2 = startGrid(2);
-        Ignite cli = startClientGrid(3);
+        Ignite srv1 = startGrid(0);
+        Ignite srv2 = startGrid(1);
+
+        Ignite cli;
+
+        try {
+            Ignition.setClientMode(true);
+
+            cli = startGrid(2);
+        }
+        finally {
+            Ignition.setClientMode(false);
+        }
 
         populateBaseQueryData(cli, parallelism);
 
@@ -406,7 +471,7 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
         for (int i = 0; i < 30; i++)
             iter.next();
 
-        stopGrid(3);
+        stopGrid(2);
 
         // Test server node leave with active worker.
         FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL));
@@ -417,7 +482,7 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
             for (int i = 0; i < 30; i++)
                 iter2.next();
 
-            stopGrid(2);
+            stopGrid(1);
         }
         finally {
             cursor2.close();
@@ -524,6 +589,8 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
             }
         }
 
+        checkConnectionLeaks(Ignition.allGrids().size());
+
         checkHoldQuery(node);
 
         checkShortQuery(node);
@@ -742,6 +809,42 @@ public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends Ab
     }
 
     /**
+     * @param nodeCnt Count of nodes.
+     * @throws Exception On error.
+     */
+    private void checkConnectionLeaks(int nodeCnt) throws Exception {
+        boolean notLeak = GridTestUtils.waitForCondition(() -> {
+            for (int i = 0; i < nodeCnt; i++) {
+                if (!usedConnections(i).isEmpty())
+                    return false;
+            }
+
+            return true;
+        }, 5000);
+
+        if (!notLeak) {
+            for (int i = 0; i < nodeCnt; i++) {
+                Set<H2PooledConnection> usedConns = usedConnections(i);
+
+                if (!usedConnections(i).isEmpty())
+                    log.error("Not closed connections: " + usedConns);
+            }
+
+            fail("H2 JDBC connections leak detected. See the log above.");
+        }
+    }
+
+    /**
+     * @param i Node index.
+     * @return Set of used connections.
+     */
+    private Set<H2PooledConnection> usedConnections(int i) {
+        ConnectionManager connMgr = ((IgniteH2Indexing)grid(i).context().query().getIndexing()).connections();
+
+        return  GridTestUtils.getFieldValue(connMgr, "usedConns");
+    }
+
+    /**
      * Person class.
      */
     private static class Person {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
index 44afe2f..7f3a00f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
@@ -258,7 +258,7 @@ public class IgniteSqlSkipReducerOnUpdateDmlSelfTest extends AbstractIndexingCom
                 return cache.query(new SqlFieldsQueryEx("UPDATE Person SET name = Fail(name)", false)
                     .setSkipReducerOnUpdate(true));
             }
-        }, CacheException.class, "Failed to execute SQL query");
+        }, CacheException.class, "Failed to run SQL update query.");
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 963d0d6..3e5e045 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -1109,8 +1109,7 @@ public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
                 checkQueryPlan(persPart,
                     false,
                     0,
-                    sql,
-                    "persPartAff", "persPart", "orgRepl");
+                    sql);
 
                 checkQueryFails(persPart, sql, true);
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
index fb8e7b5..2818599 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -21,11 +21,9 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
@@ -507,9 +505,8 @@ public class QueryDataPageScanTest extends GridCommonAbstractTest {
         /** {@inheritDoc} */
         @Override public ResultSet executeSqlQueryWithTimer(
             PreparedStatement stmt,
-            Connection conn,
+            H2PooledConnection conn,
             String sql,
-            @Nullable Collection<Object> params,
             int timeoutMillis,
             @Nullable GridQueryCancel cancel,
             Boolean dataPageScanEnabled,
@@ -518,7 +515,7 @@ public class QueryDataPageScanTest extends GridCommonAbstractTest {
             callsCnt.incrementAndGet();
             assertEquals(expectedDataPageScanEnabled, dataPageScanEnabled);
 
-            return super.executeSqlQueryWithTimer(stmt, conn, sql, params, timeoutMillis,
+            return super.executeSqlQueryWithTimer(stmt, conn, sql, timeoutMillis,
                 cancel, dataPageScanEnabled, qryInfo);
         }
     }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 0e73d7b..3a9e4fc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -40,21 +40,19 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
-import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
-import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.h2.command.Prepared;
 import org.h2.engine.Session;
-import org.h2.jdbc.JdbcConnection;
 import org.h2.message.DbException;
 import org.h2.table.Column;
 import org.h2.value.Value;
@@ -977,16 +975,10 @@ public class GridQueryParsingTest extends AbstractIndexingCommonTest {
     /**
      *
      */
-    private JdbcConnection connection() throws Exception {
-        GridKernalContext ctx = ((IgniteEx)ignite).context();
+    private H2PooledConnection connection() throws Exception {
+        IgniteH2Indexing idx = (IgniteH2Indexing)((IgniteEx)ignite).context().query().getIndexing();
 
-        GridQueryProcessor qryProcessor = ctx.query();
-
-        IgniteH2Indexing idx = U.field(qryProcessor, "idx");
-
-        String schemaName = idx.schema(DEFAULT_CACHE_NAME);
-
-        return (JdbcConnection)idx.connections().connectionForThread().connection(schemaName);
+        return idx.connections().connection(idx.schema(DEFAULT_CACHE_NAME));
     }
 
     /**
@@ -994,48 +986,14 @@ public class GridQueryParsingTest extends AbstractIndexingCommonTest {
      */
     @SuppressWarnings("unchecked")
     private <T extends Prepared> T parse(String sql) throws Exception {
-        Session ses = (Session)connection().getSession();
+        try (H2PooledConnection conn = connection()) {
+            Session ses = H2Utils.session(conn);
 
-        setQueryContext();
+            H2Utils.setupConnection(conn,
+                QueryContext.parseContext(null, true), false, false, false);
 
-        try {
             return (T)ses.prepare(sql);
         }
-        finally {
-            clearQueryContext();
-        }
-    }
-
-    /**
-     * Sets thread local query context.
-     */
-    private void setQueryContext() {
-        QueryContextRegistry qryCtxRegistry = indexing().queryContextRegistry();
-
-        QueryContext qctx = new QueryContext(
-            0,
-            null,
-            null,
-            null,
-            null,
-            true
-        );
-
-        qryCtxRegistry.setThreadLocal(qctx);
-    }
-
-    /**
-     * Clears thread local query context.
-     */
-    private void clearQueryContext() {
-        indexing().queryContextRegistry().clearThreadLocal();
-    }
-
-    /**
-     * @return H2 indexing manager.
-     */
-    private IgniteH2Indexing indexing() {
-        return (IgniteH2Indexing)((IgniteEx)ignite).context().query().getIndexing();
     }
 
     /**


Mime
View raw message