ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-5317: Added method to execute SQL fields query without concrete cache. This closes #2024.
Date Wed, 31 May 2017 07:36:23 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 0feadac5f -> c45de1681


IGNITE-5317: Added method to execute SQL fields query without concrete cache. This closes #2024.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c45de168
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c45de168
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c45de168

Branch: refs/heads/master
Commit: c45de1681110e42b88c84d82507b8bc9286182ec
Parents: 0feadac
Author: devozerov <vozerov@gridgain.com>
Authored: Wed May 31 10:36:13 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed May 31 10:36:13 2017 +0300

----------------------------------------------------------------------
 .../ignite/internal/jdbc2/JdbcConnection.java   |  26 +-
 .../processors/query/GridQueryIndexing.java     |  71 +++--
 .../processors/query/GridQueryProcessor.java    | 161 ++++++----
 .../query/h2/DmlStatementsProcessor.java        |  45 +--
 .../processors/query/h2/IgniteH2Indexing.java   | 306 +++++++------------
 .../h2/GridIndexingSpiAbstractSelfTest.java     | 121 ++------
 6 files changed, 327 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index ee8b605..9385d7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -56,8 +56,12 @@ import org.apache.ignite.compute.ComputeTaskTimeoutException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -544,10 +548,24 @@ public class JdbcConnection implements Connection {
         if (!stream)
             stmt = new JdbcPreparedStatement(this, sql);
         else {
+            GridQueryIndexing idx = ignite().context().query().getIndexing();
+
             PreparedStatement nativeStmt = prepareNativeStatement(sql);
 
-            IgniteDataStreamer<?, ?> streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName,
-                nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite);
+            if (!idx.isInsertStatement(nativeStmt))
+                throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+            IgniteDataStreamer streamer = ignite().dataStreamer(cacheName);
+
+            streamer.autoFlushFrequency(streamFlushTimeout);
+            streamer.allowOverwrite(streamAllowOverwrite);
+
+            if (streamNodeBufSize > 0)
+                streamer.perNodeBufferSize(streamNodeBufSize);
+
+            if (streamNodeParOps > 0)
+                streamer.perNodeParallelOperations(streamNodeParOps);
 
             stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt);
         }
@@ -736,8 +754,8 @@ public class JdbcConnection implements Connection {
     /**
      * @return Ignite node.
      */
-    Ignite ignite() {
-        return ignite;
+    IgniteKernal ignite() {
+        return (IgniteKernal)ignite;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 9d66c0a..4429058 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -66,77 +66,79 @@ public interface GridQueryIndexing {
     /**
      * Parses SQL query into two step query and executes it.
      *
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepBinary Keep binary flag.
+     * @param mainCacheId Main cache ID.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?,?> cctx, SqlQuery qry,
-        boolean keepBinary) throws IgniteCheckedException;
+    public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry,
+        boolean keepBinary, int mainCacheId) throws IgniteCheckedException;
 
     /**
      * Parses SQL query into two step query and executes it.
      *
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepBinary Keep binary flag.
      * @param cancel Query cancel.
+     * @param mainCacheId Main cache ID.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
-        boolean keepBinary, GridQueryCancel cancel) throws IgniteCheckedException;
+    public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
+        boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException;
 
     /**
      * Perform a MERGE statement using data streamer as receiver.
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param params Query parameters.
      * @param streamer Data streamer to feed data to.
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
-    public long streamUpdateQuery(String cacheName, String qry, @Nullable Object[] params,
+    public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
         IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
 
     /**
      * Executes regular query.
      *
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param filter Cache name and key filter.
      * @param keepBinary Keep binary flag.
      * @return Cursor.
      */
-    public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+    public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName, SqlQuery qry,
         IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
 
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
-     * @param cctx Cache context.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param keepBinary Keep binary flag.
      * @param filter Cache name and key filter.
      * @param cancel Query cancel.
      * @return Cursor.
      */
-    public FieldsQueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+    public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
         boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
 
     /**
      * Executes text query.
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param qry Text query.
      * @param typeName Type name.
      * @param filter Cache name and key filter.
      * @return Queried rows.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String cacheName, String qry,
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry,
         String typeName, IndexingQueryFilter filter) throws IgniteCheckedException;
 
     /**
@@ -196,11 +198,11 @@ public interface GridQueryIndexing {
     /**
      * Unregisters type and removes all corresponding data.
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param typeName Type name.
      * @throws IgniteCheckedException If failed.
      */
-    public void unregisterType(String cacheName, String typeName) throws IgniteCheckedException;
+    public void unregisterType(String schemaName, String typeName) throws IgniteCheckedException;
 
     /**
      * Updates index. Note that key is unique for cache, so if cache contains multiple indexes
@@ -231,19 +233,21 @@ public interface GridQueryIndexing {
     /**
      * Rebuilds all indexes of given type from hash index.
      *
-     * @param cacheName Cache name.
-     * @param type Type descriptor.
+     * @param cctx Cache context.
+     * @param schemaName Schema name.
+     * @param typeName Type name.
      * @throws IgniteCheckedException If failed.
      */
-    public void rebuildIndexesFromHash(String cacheName, GridQueryTypeDescriptor type) throws IgniteCheckedException;
+    public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName)
+        throws IgniteCheckedException;
 
     /**
      * Marks all indexes of given type for rebuild from hash index, making them unusable until rebuild finishes.
      *
      * @param cacheName Cache name.
-     * @param type Type descriptor.
+     * @param typeName Type name.
      */
-    public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type);
+    public void markForRebuildFromHash(String cacheName, String typeName);
 
     /**
      * Returns backup filter.
@@ -264,11 +268,11 @@ public interface GridQueryIndexing {
     /**
      * Prepare native statement to retrieve JDBC metadata from.
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param sql Query.
      * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
      */
-    public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException;
+    public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException;
 
     /**
      * Gets cache name from database schema.
@@ -299,15 +303,18 @@ public interface GridQueryIndexing {
     public void cancelAllQueries();
 
     /**
-     * @param cacheName Cache name.
+     * Gets database schema from cache name.
+     *
+     * @param cacheName Cache name. {@code null} would be converted to an empty string.
+     * @return Schema name. Should not be null since we should not fail for an invalid cache name.
+     */
+    public String schema(String cacheName);
+
+    /**
+     * Check if passed statement is insert statemtn.
+     *
      * @param nativeStmt Native statement.
-     * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
-     * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
-     * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
-     * @param allowOverwrite Overwrite existing cache values on key duplication.
-     * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata;
-     * {@code null} if given statement is a query.
+     * @return {@code True} if insert.
      */
-    public IgniteDataStreamer<?,?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq,
-        int nodeBufSize, int nodeParOps, boolean allowOverwrite);
+    public boolean isInsertStatement(PreparedStatement nativeStmt);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 320c90b..990226e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -44,11 +44,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorImpl;
@@ -1508,10 +1506,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param desc Type descriptor.
      * @return Future that will be completed when rebuilding of all indexes is finished.
      */
-    private IgniteInternalFuture<Object> rebuildIndexesFromHash(
-        @Nullable final String cacheName,
-        @Nullable final QueryTypeDescriptorImpl desc
-    ) {
+    private IgniteInternalFuture<Object> rebuildIndexesFromHash(@Nullable final String cacheName,
+        @Nullable final QueryTypeDescriptorImpl desc) {
         if (idx == null)
             return new GridFinishedFuture<>(new IgniteCheckedException("Indexing is disabled."));
 
@@ -1520,12 +1516,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         final GridWorkerFuture<Object> fut = new GridWorkerFuture<>();
 
-        idx.markForRebuildFromHash(cacheName, desc);
+        final String schemaName = idx.schema(cacheName);
+        final String typeName = desc.name();
+
+        idx.markForRebuildFromHash(schemaName, typeName);
 
         GridWorker w = new GridWorker(ctx.igniteInstanceName(), "index-rebuild-worker", log) {
             @Override protected void body() {
                 try {
-                    idx.rebuildIndexesFromHash(cacheName, desc);
+                    int cacheId = CU.cacheId(cacheName);
+
+                    GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+                    idx.rebuildIndexesFromHash(cctx, schemaName, typeName);
 
                     fut.onDone();
                 }
@@ -1533,7 +1536,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     fut.onDone(e);
                 }
                 catch (Throwable e) {
-                    log.error("Failed to rebuild indexes for type: " + desc.name(), e);
+                    log.error("Failed to rebuild indexes for type: " + typeName, e);
 
                     fut.onDone(e);
 
@@ -1721,12 +1724,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         final boolean keepBinary) {
         checkxEnabled();
 
-        if (qry.isReplicatedOnly() && qry.getPartitions() != null)
-            throw new CacheException("Partitions are not supported in replicated only mode.");
-
-        if (qry.isDistributedJoins() && qry.getPartitions() != null)
-            throw new CacheException(
-                "Using both partitions and distributed JOINs is not supported for the same query");
+        validateSqlFieldsQuery(qry);
 
         boolean loc = (qry.isReplicatedOnly() && cctx.isReplicatedAffinityNode()) || cctx.isLocal() || qry.isLocal();
 
@@ -1734,6 +1732,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
+            final String schemaName = idx.schema(cctx.name());
+            final int mainCacheId = CU.cacheId(cctx.name());
+
             IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo;
 
             if (loc) {
@@ -1741,32 +1742,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
                         GridQueryCancel cancel = new GridQueryCancel();
 
-                        final FieldsQueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry, keepBinary,
-                            idx.backupFilter(requestTopVer.get(), qry.getPartitions()), cancel);
+                        FieldsQueryCursor<List<?>> cur;
 
-                        Iterable<List<?>> iterExec = new Iterable<List<?>>() {
-                            @Override public Iterator<List<?>> iterator() {
-                                sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
+                        if (cctx.config().getQueryParallelism() > 1) {
+                            qry.setDistributedJoins(true);
 
-                                return cursor.iterator();
-                            }
-                        };
+                            cur = idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, mainCacheId);
+                        }
+                        else {
+                            IndexingQueryFilter filter = idx.backupFilter(requestTopVer.get(), qry.getPartitions());
 
-                        return new QueryCursorImpl<List<?>>(iterExec, cancel) {
-                            @Override public List<GridQueryFieldMetadata> fieldsMeta() {
-                                if (cursor instanceof QueryCursorImpl)
-                                    return ((QueryCursorEx)cursor).fieldsMeta();
+                            cur = idx.queryLocalSqlFields(schemaName, qry, keepBinary, filter, cancel);
+                        }
 
-                                return super.fieldsMeta();
-                            }
-                        };
+                        sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx.name());
+
+                        return cur;
                     }
                 };
             }
             else {
                 clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
                     @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                        return idx.queryDistributedSqlFields(cctx, qry, keepBinary, null);
+                        return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId);
                     }
                 };
             }
@@ -1782,6 +1780,58 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Query SQL fields without strict dependency on concrete cache.
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @param keepBinary Keep binary flag.
+     * @return Cursot.
+     */
+    public FieldsQueryCursor<List<?>> querySqlFieldsNoCache(final String schemaName, final SqlFieldsQuery qry,
+        final boolean keepBinary) {
+        checkxEnabled();
+
+        validateSqlFieldsQuery(qry);
+
+        if (qry.isLocal())
+            throw new IgniteException("Local query is not supported without specific cache.");
+
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+        try {
+            IgniteOutClosureX<FieldsQueryCursor<List<?>>> clo = new IgniteOutClosureX<FieldsQueryCursor<List<?>>>() {
+                @Override public FieldsQueryCursor<List<?>> applyx() throws IgniteCheckedException {
+                    GridQueryCancel cancel = new GridQueryCancel();
+
+                    return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, null);
+                }
+            };
+
+            return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), null, clo, true);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Validate SQL fields query.
+     *
+     * @param qry Query.
+     */
+    private static void validateSqlFieldsQuery(SqlFieldsQuery qry) {
+        if (qry.isReplicatedOnly() && qry.getPartitions() != null)
+            throw new CacheException("Partitions are not supported in replicated only mode.");
+
+        if (qry.isDistributedJoins() && qry.getPartitions() != null)
+            throw new CacheException("Using both partitions and distributed JOINs is not supported for the same query");
+    }
+
+    /**
      * @param cacheName Cache name.
      * @param streamer Data streamer.
      * @param qry Query.
@@ -1797,9 +1847,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         try {
             GridCacheContext cctx = ctx.cache().cache(cacheName).context();
 
+            final String schemaName = idx.schema(cacheName);
+
             return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
                 @Override public Long applyx() throws IgniteCheckedException {
-                    return idx.streamUpdateQuery(cacheName, qry, args, streamer);
+                    return idx.streamUpdateQuery(schemaName, qry, args, streamer);
                 }
             }, true);
         }
@@ -1848,10 +1900,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
+            final String schemaName = idx.schema(cctx.name());
+            final int mainCacheId = CU.cacheId(cctx.name());
+
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
                 new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
                     @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
-                        return idx.queryDistributedSql(cctx, qry, keepBinary);
+                        return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId);
                     }
                 }, true);
         }
@@ -1874,6 +1929,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
+        final String schemaName = idx.schema(cctx.name());
+        final int mainCacheId = CU.cacheId(cctx.name());
+
         try {
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
                 new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
@@ -1889,8 +1947,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                             qry.getArgs(),
                             cctx.name());
 
-                        return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), qry.getPartitions()),
-                            keepBinary);
+                        if (cctx.config().getQueryParallelism() > 1) {
+                            qry.setDistributedJoins(true);
+
+                            return idx.queryDistributedSql(schemaName, qry, keepBinary, mainCacheId);
+                        }
+                        else
+                            return idx.queryLocalSql(schemaName, qry, idx.backupFilter(requestTopVer.get(),
+                                qry.getPartitions()), keepBinary);
                     }
                 }, true);
         }
@@ -2036,7 +2100,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
         checkxEnabled();
 
-        return idx.prepareNativeStatement(cacheName, sql);
+        String schemaName = idx.schema(cacheName);
+
+        return idx.prepareNativeStatement(schemaName, sql);
     }
 
     /**
@@ -2051,21 +2117,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name.
-     * @param nativeStmt Native statement.
-     * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
-     * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
-     * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
-     * @param allowOverwrite Overwrite existing cache values on key duplication.
-     * @see IgniteDataStreamer#allowOverwrite
-     * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata.
-     */
-    public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt, long autoFlushFreq,
-        int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
-        return idx.createStreamer(cacheName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite);
-    }
-
-    /**
-     * @param cacheName Cache name.
      * @param key Key.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
@@ -2122,8 +2173,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
                     @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
                         String typeName = typeName(cacheName, resType);
+                        String schemaName = idx.schema(cacheName);
 
-                        return idx.queryLocalText(cacheName, clause, typeName, filters);
+                        return idx.queryLocalText(schemaName, clause, typeName, filters);
                     }
                 }, true);
         }
@@ -2191,7 +2243,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param complete Complete.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public <R> R executeQuery(GridCacheQueryType qryType, String qry, GridCacheContext<?, ?> cctx,
+    public <R> R executeQuery(GridCacheQueryType qryType, String qry, @Nullable GridCacheContext<?, ?> cctx,
         IgniteOutClosureX<R> clo, boolean complete) throws IgniteCheckedException {
         final long startTime = U.currentTimeMillis();
 
@@ -2231,7 +2283,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             long duration = U.currentTimeMillis() - startTime;
 
             if (complete || failed) {
-                cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed);
+                if (cctx != null)
+                    cctx.queries().collectMetrics(qryType, qry, startTime, duration, failed);
 
                 if (log.isTraceEnabled())
                     log.trace("Query execution [startTime=" + startTime + ", duration=" + duration +

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index d48c373..98d123f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -138,7 +139,7 @@ public class DmlStatementsProcessor {
     /**
      * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
      *
-     * @param schema Schema.
+     * @param schemaName Schema.
      * @param stmt JDBC statement.
      * @param fieldsQry Original query.
      * @param loc Query locality flag.
@@ -147,13 +148,13 @@ public class DmlStatementsProcessor {
      * @return Update result (modified items count and failed keys).
      * @throws IgniteCheckedException if failed.
      */
-    private UpdateResult updateSqlFields(String schema, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+    private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
         boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
         Object[] errKeys = null;
 
         long items = 0;
 
-        UpdatePlan plan = getPlanForStatement(schema, stmt, null);
+        UpdatePlan plan = getPlanForStatement(schemaName, stmt, null);
 
         GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
 
@@ -177,7 +178,7 @@ public class DmlStatementsProcessor {
             UpdateResult r;
 
             try {
-                r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
+                r = executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
             }
             finally {
                 cctx.operationContextPerCall(opCtx);
@@ -201,7 +202,7 @@ public class DmlStatementsProcessor {
     }
 
     /**
-     * @param schema Schema.
+     * @param schemaName Schema.
      * @param stmt Prepared statement.
      * @param fieldsQry Initial query
      * @param cancel Query cancel.
@@ -209,12 +210,12 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schema, PreparedStatement stmt,
+    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt,
         SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, false, null, cancel);
+        UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel);
 
         QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-            (Collections.singletonList(res.cnt)), null, false);
+            (Collections.singletonList(res.cnt)), cancel, false);
 
         resCur.fieldsMeta(UPDATE_RESULT_META);
 
@@ -224,7 +225,7 @@ public class DmlStatementsProcessor {
     /**
      * Execute DML statement on local cache.
      *
-     * @param schema Schema.
+     * @param schemaName Schema.
      * @param stmt Prepared statement.
      * @param fieldsQry Fields query.
      * @param filters Cache name and key filter.
@@ -233,10 +234,10 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    GridQueryFieldsResult updateSqlFieldsLocal(String schema, PreparedStatement stmt,
+    GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt,
         SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
         throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schema, stmt, fieldsQry, true, filters, cancel);
+        UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel);
 
         return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
             new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
@@ -276,8 +277,8 @@ public class DmlStatementsProcessor {
 
             final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum);
 
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(args),
-                null, false, 0, null);
+            final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry,
+                F.asList(args), null, false, 0, null);
 
             QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
@@ -327,6 +328,7 @@ public class DmlStatementsProcessor {
     /**
      * Actually perform SQL DML operation locally.
      *
+     * @param schemaName Schema name.
      * @param cctx Cache context.
      * @param prepStmt Prepared statement for DML query.
      * @param fieldsQry Fields query.
@@ -336,12 +338,14 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"ConstantConditions", "unchecked"})
-    private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
-        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel,
-        Object[] failedKeys) throws IgniteCheckedException {
+    private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx,
+        PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
+        GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
+        int mainCacheId = CU.cacheId(cctx.name());
+
         Integer errKeysPos = null;
 
-        UpdatePlan plan = getPlanForStatement(idx.schema(cctx.name()), prepStmt, errKeysPos);
+        UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos);
 
         if (plan.fastUpdateArgs != null) {
             assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -364,16 +368,17 @@ public class DmlStatementsProcessor {
                 .setPageSize(fieldsQry.getPageSize())
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
-            cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(cctx, newFieldsQry, true, cancel);
+            cur = (QueryCursorImpl<List<?>>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel,
+                mainCacheId);
         }
         else {
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(cctx.name(), plan.selectQry,
+            final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry,
                 F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
                     try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), cctx.keepBinary());
+                        return new GridQueryCacheObjectsIterator(res.iterator(), idx.objectContext(), true);
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1e19954..bd611f6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
@@ -398,35 +399,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
-        String schemaName = schema(cacheName);
-
-        return prepareStatement(connectionForSchema(schemaName), sql, true);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public IgniteDataStreamer<?, ?> createStreamer(String cacheName, PreparedStatement nativeStmt,
-        long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
-        Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
-
-        if (!(prep instanceof Insert))
-            throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        IgniteDataStreamer streamer = ctx.grid().dataStreamer(cacheName);
-
-        streamer.autoFlushFrequency(autoFlushFreq);
-
-        streamer.allowOverwrite(allowOverwrite);
-
-        if (nodeBufSize > 0)
-            streamer.perNodeBufferSize(nodeBufSize);
-
-        if (nodeParOps > 0)
-            streamer.perNodeParallelOperations(nodeParOps);
+    @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException {
+        Connection conn = connectionForSchema(schemaName);
 
-        return streamer;
+        return prepareStatement(conn, sql, true);
     }
 
     /**
@@ -564,7 +540,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         GridCacheVersion ver,
         long expirationTime,
         long link) throws IgniteCheckedException {
-        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+        H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name());
 
         if (tbl == null)
             return; // Type was rejected.
@@ -588,7 +564,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']');
 
-        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+        H2TableDescriptor tbl = tableDescriptor(schema(cacheName), type.name());
 
         if (tbl == null)
             return;
@@ -782,13 +758,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(
-        String cacheName, String qry, String typeName,
-        IndexingQueryFilter filters) throws IgniteCheckedException {
-        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalText(String schemaName, String qry,
+        String typeName, IndexingQueryFilter filters) throws IgniteCheckedException {
+        H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
-            GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, cacheName,
+            GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, TEXT, schemaName,
                 U.currentTimeMillis(), null, true);
 
             try {
@@ -805,9 +780,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void unregisterType(String cacheName, String typeName)
+    @Override public void unregisterType(String schemaName, String typeName)
         throws IgniteCheckedException {
-        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
+        H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
 
         if (tbl != null)
             removeTable(tbl);
@@ -816,7 +791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param params Query parameters.
      * @param filter Cache name and key filter.
@@ -827,12 +802,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public GridQueryFieldsResult queryLocalSqlFields(final String cacheName, final String qry,
+    public GridQueryFieldsResult queryLocalSqlFields(final String schemaName, final String qry,
         @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
         final int timeout, final GridQueryCancel cancel) throws IgniteCheckedException {
-        final String schema = schema(cacheName);
-
-        final Connection conn = connectionForSchema(schema);
+        final Connection conn = connectionForSchema(schemaName);
 
         H2Utils.setupConnection(conn, false, enforceJoinOrder);
 
@@ -849,7 +822,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateSqlFieldsLocal(schema, stmt, fldsQry, filter, cancel);
+            return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel);
         }
         else if (DdlStatementsProcessor.isDdlStatement(p))
             throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
@@ -874,12 +847,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 GridH2QueryContext.set(ctx);
 
                 GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL_FIELDS,
-                    cacheName, U.currentTimeMillis(), cancel, true);
+                    schemaName, U.currentTimeMillis(), cancel, true);
 
                 runs.putIfAbsent(run.id(), run);
 
                 try {
-                    ResultSet rs = executeSqlQueryWithTimer(schema, stmt, conn, qry, params, timeout, cancel);
+                    ResultSet rs = executeSqlQueryWithTimer(schemaName, stmt, conn, qry, params, timeout, cancel);
 
                     return new H2FieldsIterator(rs);
                 }
@@ -893,10 +866,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public long streamUpdateQuery(String cacheName, String qry,
+    @Override public long streamUpdateQuery(String schemaName, String qry,
         @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
-        String schemaName = schema(cacheName);
-
         final Connection conn = connectionForSchema(schemaName);
 
         final PreparedStatement stmt;
@@ -1074,97 +1045,76 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
-        final SqlFieldsQuery qry, final boolean keepBinary, final IndexingQueryFilter filter,
-        final GridQueryCancel cancel) throws IgniteCheckedException {
-
-        if (cctx.config().getQueryParallelism() > 1) {
-            qry.setDistributedJoins(true);
+    @Override public FieldsQueryCursor<List<?>> queryLocalSqlFields(String schemaName, SqlFieldsQuery qry,
+        final boolean keepBinary, IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+        String sql = qry.getSql();
+        Object[] args = qry.getArgs();
 
-            assert qry.isLocal();
+        final GridQueryFieldsResult res = queryLocalSqlFields(schemaName, sql, F.asList(args), filter,
+            qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
 
-            return queryDistributedSqlFields(cctx, qry, keepBinary, cancel);
-        }
-        else {
-            final String cacheName = cctx.name();
-            final String sql = qry.getSql();
-            final Object[] args = qry.getArgs();
-
-            final GridQueryFieldsResult res = queryLocalSqlFields(cacheName, sql, F.asList(args), filter,
-                qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
-
-            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
-                    }
+        QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                try {
+                    return new GridQueryCacheObjectsIterator(res.iterator(), objectContext(), keepBinary);
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
                 }
-            }, cancel);
+            }
+        }, cancel);
 
-            cursor.fieldsMeta(res.metaData());
+        cursor.fieldsMeta(res.metaData());
 
-            return cursor;
-        }
+        return cursor;
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(String schemaName,
         final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
-        if (cctx.config().getQueryParallelism() > 1) {
-            qry.setDistributedJoins(true);
+        String type = qry.getType();
+        String sqlQry = qry.getSql();
+        String alias = qry.getAlias();
+        Object[] params = qry.getArgs();
 
-            assert qry.isLocal();
+        GridQueryCancel cancel = new GridQueryCancel();
 
-            return queryDistributedSql(cctx, qry, keepBinary);
-        }
-        else {
-            String cacheName = cctx.name();
-            String type = qry.getType();
-            String sqlQry = qry.getSql();
-            String alias = qry.getAlias();
-            Object[] params = qry.getArgs();
-
-            GridQueryCancel cancel = new GridQueryCancel();
-
-            final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(cacheName, sqlQry, alias,
-                F.asList(params), type, filter, cancel);
-
-            return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
-                @Override public Iterator<Cache.Entry<K, V>> iterator() {
-                    return new ClIter<Cache.Entry<K, V>>() {
-                        @Override public void close() throws Exception {
-                            i.close();
-                        }
+        final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(schemaName, sqlQry, alias,
+            F.asList(params), type, filter, cancel);
 
-                        @Override public boolean hasNext() {
-                            return i.hasNext();
-                        }
+        return new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
+            @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                return new ClIter<Cache.Entry<K, V>>() {
+                    @Override public void close() throws Exception {
+                        i.close();
+                    }
 
-                        @Override public Cache.Entry<K, V> next() {
-                            IgniteBiTuple<K, V> t = i.next();
+                    @Override public boolean hasNext() {
+                        return i.hasNext();
+                    }
 
-                            return new CacheEntryImpl<>(
-                                (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
-                                (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
-                        }
+                    @Override public Cache.Entry<K, V> next() {
+                        IgniteBiTuple<K, V> t = i.next();
 
-                        @Override public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            }, cancel);
-        }
+                        K key = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get1(), keepBinary, false);
+                        V val = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objectContext(), t.get2(), keepBinary, false);
+
+                        return new CacheEntryImpl<>(key, val);
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        }, cancel);
     }
 
     /**
      * Executes regular query.
      *
-     * @param cacheName Cache name.
+     * @param schemaName Schema name.
      * @param qry Query.
      * @param alias Table alias.
      * @param params Query parameters.
@@ -1174,10 +1124,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String cacheName,
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(String schemaName,
         final String qry, String alias, @Nullable final Collection<Object> params, String type,
         final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
-        final H2TableDescriptor tbl = tableDescriptor(type, cacheName);
+        final H2TableDescriptor tbl = tableDescriptor(schemaName, type);
 
         if (tbl == null)
             throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1192,13 +1142,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
             .distributedJoinMode(OFF));
 
-        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, cacheName,
+        GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, schemaName,
             U.currentTimeMillis(), null, true);
 
         runs.put(run.id(), run);
 
         try {
-            ResultSet rs = executeSqlQueryWithTimer(schema(cacheName), conn, sql, params, true, 0, cancel);
+            ResultSet rs = executeSqlQueryWithTimer(schemaName, conn, sql, params, true, 0, cancel);
 
             return new H2KeyValueIterator(rs);
         }
@@ -1237,12 +1187,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(GridCacheContext<?, ?> cctx,
-        SqlQuery qry, boolean keepBinary) {
+    @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, SqlQuery qry,
+        boolean keepBinary, int mainCacheId) {
         String type = qry.getType();
-        String cacheName = cctx.name();
 
-        H2TableDescriptor tblDesc = tableDescriptor(type, cacheName);
+        H2TableDescriptor tblDesc = tableDescriptor(schemaName, type);
 
         if (tblDesc == null)
             throw new IgniteSQLException("Failed to find SQL table for type: " + type,
@@ -1268,7 +1217,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (qry.getTimeout() > 0)
             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
-        final QueryCursor<List<?>> res = queryDistributedSqlFields(cctx, fqry, keepBinary, null);
+        final QueryCursor<List<?>> res = queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId);
 
         final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
             @Override public Iterator<Cache.Entry<K, V>> iterator() {
@@ -1301,12 +1250,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(GridCacheContext<?, ?> cctx,
-        SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel) {
+    @Override public FieldsQueryCursor<List<?>> queryDistributedSqlFields(String schemaName,
+        SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) {
         final String sqlQry = qry.getSql();
 
-        String schemaName = schema(cctx.name());
-
         Connection c = connectionForSchema(schemaName);
 
         final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
@@ -1413,14 +1360,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
 
-                // Setup caches from schemas.
                 assert twoStepQry != null;
 
                 int tblCnt = twoStepQry.tablesCount();
 
-                if (tblCnt > 0) {
-                    caches0.add(cctx.cacheId());
+                if (mainCacheId != null)
+                    caches0.add(mainCacheId);
 
+                if (tblCnt > 0) {
                     for (QueryTable tblKey : twoStepQry.tables()) {
                         GridH2Table tbl = dataTable(tblKey);
 
@@ -1429,8 +1376,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         caches0.add(cacheId);
                     }
                 }
-                else
-                    caches0.add(cctx.cacheId());
+
+                if (caches0.isEmpty())
+                    throw new IgniteSQLException("Failed to find at least one cache for SQL statement: " + sqlQry);
 
                 //Prohibit usage indices with different numbers of segments in same query.
                 List<Integer> cacheIds = new ArrayList<>(caches0);
@@ -1470,6 +1418,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         if (cachedQry == null && !twoStepQry.explain()) {
             cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy());
+
             twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
         }
 
@@ -1732,30 +1681,24 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Gets table descriptor by type and cache names.
+     * Get table descriptor.
      *
+     * @param schemaName Schema name.
      * @param type Type name.
-     * @param cacheName Cache name.
-     * @return Table descriptor.
+     * @return Descriptor.
      */
-    @Nullable private H2TableDescriptor tableDescriptor(String type, String cacheName) {
-        String schemaName = schema(cacheName);
-
+    @Nullable private H2TableDescriptor tableDescriptor(String schemaName, String type) {
         H2Schema schema = schemas.get(schemaName);
 
         if (schema == null)
             return null;
 
         return schema.tableByTypeName(type);
-    }
+    };
 
-    /**
-     * Gets database schema from cache name.
-     *
-     * @param cacheName Cache name. {@code null} would be converted to an empty string.
-     * @return Schema name. Should not be null since we should not fail for an invalid cache name.
-     */
-    public String schema(String cacheName) {
+
+    /** {@inheritDoc} */
+    @Override  public String schema(String cacheName) {
         String res = cacheName2schema.get(cacheName);
 
         if (res == null)
@@ -1764,6 +1707,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         return res;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
+        Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
+
+        return prep instanceof Insert;
+    }
+
     /**
      * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}.
      */
@@ -1792,17 +1742,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         return schema.cacheName();
     }
 
-    /**
-     * Rebuild indexes from hash index.
-     *
-     * @param cacheName Cache name.
-     * @param type Type descriptor.
-     * @throws IgniteCheckedException If failed.
-     */
+    /** {@inheritDoc} */
     @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
-    @Override public void rebuildIndexesFromHash(String cacheName,
-        GridQueryTypeDescriptor type) throws IgniteCheckedException {
-        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+    @Override public void rebuildIndexesFromHash(GridCacheContext cctx, String schemaName, String typeName)
+        throws IgniteCheckedException {
+        H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
 
         if (tbl == null)
             return;
@@ -1815,10 +1759,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         Cursor cursor = hashIdx.find((Session)null, null, null);
 
-        int cacheId = CU.cacheId(cacheName);
-
-        GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
-
         while (cursor.next()) {
             CacheDataRow dataRow = (CacheDataRow)cursor.get();
 
@@ -1859,8 +1799,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void markForRebuildFromHash(String cacheName, GridQueryTypeDescriptor type) {
-        H2TableDescriptor tbl = tableDescriptor(type.name(), cacheName);
+    @Override public void markForRebuildFromHash(String schemaName, String typeName) {
+        H2TableDescriptor tbl = tableDescriptor(schemaName, typeName);
 
         if (tbl == null)
             return;
@@ -1871,40 +1811,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * Gets size (for tests only).
-     *
-     * @param cacheName Cache name.
-     * @param typeName Type name.
-     * @return Size.
-     * @throws IgniteCheckedException If failed or {@code -1} if the type is unknown.
-     */
-    long size(String cacheName, String typeName) throws IgniteCheckedException {
-        String schemaName = schema(cacheName);
-
-        H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
-
-        if (tbl == null)
-            return -1;
-
-        Connection conn = connectionForSchema(schemaName);
-
-        H2Utils.setupConnection(conn, false, false);
-
-        try {
-            ResultSet rs = executeSqlQuery(conn, prepareStatement(conn, "SELECT COUNT(*) FROM " + tbl.fullTableName(),
-                false), 0, null);
-
-            if (!rs.next())
-                throw new IllegalStateException();
-
-            return rs.getLong(1);
-        }
-        catch (SQLException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
      * @return Busy lock.
      */
     public GridSpinBusyLock busyLock() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c45de168/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 73a7191..7b0cbf8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -247,33 +247,25 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
     public void testSpi() throws Exception {
         IgniteH2Indexing spi = getIndexing();
 
-        assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         IgniteCache<Integer, BinaryObject> cacheA = ignite0.createCache(cacheACfg());
 
-        assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(-1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         IgniteCache<Integer, BinaryObject> cacheB = ignite0.createCache(cacheBCfg());
 
-        // Initially all is empty.
-        assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
+        assertFalse(spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select * from A.A", null, Collections.emptySet(),
+            typeAA.name(), null, null).hasNext());
 
-        assertFalse(spi.queryLocalSql(typeAA.cacheName(), "select * from A.A", null, Collections.emptySet(), typeAA.name(), null, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select * from A.B", null, Collections.emptySet(), typeAB.name(), null, null).hasNext());
-        assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A", null, Collections.emptySet(), typeBA.name(), null, null).hasNext());
+        assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select * from A.B", null, Collections.emptySet(),
+            typeAB.name(), null, null).hasNext());
 
-        assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select * from B.A, A.B, A.A", null,
+        assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A", null, Collections.emptySet(),
+            typeBA.name(), null, null).hasNext());
+
+        assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select * from B.A, A.B, A.A", null,
             Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         try {
-            spi.queryLocalSql(typeBA.cacheName(), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba", null,
-                Collections.emptySet(), typeBA.name(), null, null).hasNext();
+            spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select aa.*, ab.*, ba.* from A.A aa, A.B ab, B.A ba",
+                null, Collections.emptySet(), typeBA.name(), null, null).hasNext();
 
             fail("Enumerations of aliases in select block must be prohibited");
         }
@@ -281,60 +273,23 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
             // all fine
         }
 
-        assertFalse(spi.queryLocalSql(typeAB.cacheName(), "select ab.* from A.B ab", null,
+        assertFalse(spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select ab.* from A.B ab", null,
             Collections.emptySet(), typeAB.name(), null, null).hasNext());
 
-        assertFalse(spi.queryLocalSql(typeBA.cacheName(), "select   ba.*   from B.A  as ba", null,
+        assertFalse(spi.queryLocalSql(spi.schema(typeBA.cacheName()), "select   ba.*   from B.A  as ba", null,
             Collections.emptySet(), typeBA.name(), null, null).hasNext());
 
         cacheA.put(1, aa("A", 1, "Vasya", 10).build());
-
-        assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(0, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheA.put(1, ab(1, "Vasya", 20, "Some text about Vasya goes here.").build());
-
-        // In one cache all keys must be unique.
-        assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheB.put(1, ba(2, "Petya", 25, true).build());
-
-        // No replacement because of different cache.
-        assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheB.put(1, ba(2, "Kolya", 25, true).build());
-
-        // Replacement in the same table.
-        assertEquals(0, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheA.put(2, aa("A", 2, "Valera", 19).build());
-
-        assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheA.put(3, aa("A", 3, "Borya", 18).build());
-
-        assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheA.put(4, ab(4, "Vitalya", 20, "Very Good guy").build());
 
-        assertEquals(2, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         // Query data.
-        Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res =
-            spi.queryLocalSql(typeAA.cacheName(), "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
+        Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> res = spi.queryLocalSql(spi.schema(typeAA.cacheName()),
+            "from a order by age", null, Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(aa("A", 3, "Borya", 18).build(), value(res.next()));
@@ -342,7 +297,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next()));
         assertFalse(res.hasNext());
 
-        res = spi.queryLocalSql(typeAA.cacheName(), "select aa.* from a aa order by aa.age", null,
+        res = spi.queryLocalSql(spi.schema(typeAA.cacheName()), "select aa.* from a aa order by aa.age", null,
             Collections.emptySet(), typeAA.name(), null, null);
 
         assertTrue(res.hasNext());
@@ -351,7 +306,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(aa("A", 2, "Valera", 19).build(), value(res.next()));
         assertFalse(res.hasNext());
 
-        res = spi.queryLocalSql(typeAB.cacheName(), "from b order by name", null, Collections.emptySet(), typeAB.name(), null, null);
+        res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "from b order by name", null, Collections.emptySet(),
+            typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ab(1, "Vasya", 20, "Some text about Vasya goes here.").build(), value(res.next()));
@@ -359,7 +315,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next()));
         assertFalse(res.hasNext());
 
-        res = spi.queryLocalSql(typeAB.cacheName(), "select bb.* from b as bb order by bb.name", null,
+        res = spi.queryLocalSql(spi.schema(typeAB.cacheName()), "select bb.* from b as bb order by bb.name", null,
             Collections.emptySet(), typeAB.name(), null, null);
 
         assertTrue(res.hasNext());
@@ -368,16 +324,16 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
         assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(res.next()));
         assertFalse(res.hasNext());
 
-
-        res = spi.queryLocalSql(typeBA.cacheName(), "from a", null, Collections.emptySet(), typeBA.name(), null, null);
+        res = spi.queryLocalSql(spi.schema(typeBA.cacheName()), "from a", null, Collections.emptySet(), typeBA.name(),
+            null, null);
 
         assertTrue(res.hasNext());
         assertEquals(ba(2, "Kolya", 25, true).build(), value(res.next()));
         assertFalse(res.hasNext());
 
         // Text queries
-        Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(typeAB.cacheName(), "good",
-            typeAB.name(), null);
+        Iterator<IgniteBiTuple<Integer, BinaryObjectImpl>> txtRes = spi.queryLocalText(spi.schema(typeAB.cacheName()),
+            "good", typeAB.name(), null);
 
         assertTrue(txtRes.hasNext());
         assertEquals(ab(4, "Vitalya", 20, "Very Good guy").build(), value(txtRes.next()));
@@ -385,7 +341,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
 
         // Fields query
         GridQueryFieldsResult fieldsRes =
-            spi.queryLocalSqlFields("A", "select a.a.name n1, a.a.age a1, b.a.name n2, " +
+            spi.queryLocalSqlFields(spi.schema("A"), "select a.a.name n1, a.a.age a1, b.a.name n2, " +
             "b.a.age a2 from a.a, b.a where a.a.id = b.a.id ", Collections.emptySet(), null, false, 0, null);
 
         String[] aliases = {"N1", "A1", "N2", "A2"};
@@ -410,33 +366,12 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
 
         // Remove
         cacheA.remove(2);
-
-        assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(1, spi.size(typeBA.cacheName(), typeBA.name()));
-
         cacheB.remove(1);
 
-        assertEquals(1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
         // Unregister.
-        spi.unregisterType(typeAA.cacheName(), typeAA.name());
-
-        assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(2, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
-        spi.unregisterType(typeAB.cacheName(), typeAB.name());
-
-        assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
-        assertEquals(-1, spi.size(typeAB.cacheName(), typeAB.name()));
-        assertEquals(0, spi.size(typeBA.cacheName(), typeBA.name()));
-
-        spi.unregisterType(typeBA.cacheName(), typeBA.name());
-
-        assertEquals(-1, spi.size(typeAA.cacheName(), typeAA.name()));
+        spi.unregisterType(spi.schema(typeAA.cacheName()), typeAA.name());
+        spi.unregisterType(spi.schema(typeAB.cacheName()), typeAB.name());
+        spi.unregisterType(spi.schema(typeBA.cacheName()), typeBA.name());
     }
 
     /**
@@ -469,8 +404,8 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
                 time = now;
                 range *= 3;
 
-                GridQueryFieldsResult res = spi.queryLocalSqlFields("A", sql, Arrays.<Object>asList(1, range),
-                    null, false, 0, null);
+                GridQueryFieldsResult res = spi.queryLocalSqlFields(spi.schema("A"), sql, Arrays.<Object>asList(1,
+                    range), null, false, 0, null);
 
                 assert res.iterator().hasNext();
 


Mime
View raw message