ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/2] ignite git commit: IGNITE-7253: JDBC thin driver: implemented streaming. This closes #3499. This closes #3591.
Date Wed, 14 Mar 2018 09:23:52 GMT
Repository: ignite
Updated Branches:
  refs/heads/master ae2bf3d6f -> 7366809ed


http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f666cdd..bde9427 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -263,7 +263,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     idx.cancelAllQueries();
 
                 return;
-            } catch (InterruptedException ignored) {
+            }
+            catch (InterruptedException ignored) {
                 U.warn(log, "Interrupted while waiting for active queries cancellation.");
 
                 Thread.currentThread().interrupt();
@@ -1974,13 +1975,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      */
     public List<FieldsQueryCursor<List<?>>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary,
         final boolean failOnMultipleStmts) {
-        return querySqlFields(null, qry, keepBinary, failOnMultipleStmts);
-    }
-
-    @SuppressWarnings("unchecked")
-    public FieldsQueryCursor<List<?>> querySqlFields(final GridCacheContext<?,?> cctx, final SqlFieldsQuery qry,
-        final boolean keepBinary) {
-        return querySqlFields(cctx, qry, keepBinary, true).get(0);
+        return querySqlFields(null, qry, null, keepBinary, failOnMultipleStmts);
     }
 
     /**
@@ -1991,7 +1986,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> querySqlFields(final SqlFieldsQuery qry, final boolean keepBinary) {
-        return querySqlFields(null, qry, keepBinary, true).get(0);
+        return querySqlFields(null, qry, null, keepBinary, true).get(0);
     }
 
     /**
@@ -1999,14 +1994,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      *
      * @param cctx Cache context.
      * @param qry Query.
+     * @param cliCtx Client context.
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains
      *      more then one SQL statement.
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final GridCacheContext<?,?> cctx,
-        final SqlFieldsQuery qry, final boolean keepBinary, final boolean failOnMultipleStmts) {
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final GridCacheContext<?, ?> cctx,
+        final SqlFieldsQuery qry, final SqlClientContext cliCtx, final boolean keepBinary,
+        final boolean failOnMultipleStmts) {
         checkxEnabled();
 
         validateSqlFieldsQuery(qry);
@@ -2034,7 +2031,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     GridQueryCancel cancel = new GridQueryCancel();
 
                     List<FieldsQueryCursor<List<?>>> res =
-                        idx.querySqlFields(schemaName, qry, keepBinary, failOnMultipleStmts, cancel);
+                        idx.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
 
                     if (cctx != null)
                         sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), cctx);
@@ -2073,7 +2070,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param schemaName Schema name.
      * @param streamer Data streamer.
      * @param qry Query.
-     * @return Iterator.
+     * @return Update counter.
      */
     public long streamUpdateQuery(@Nullable final String cacheName, final String schemaName,
         final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) {
@@ -2100,6 +2097,33 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param schemaName Schema name.
+     * @param cliCtx Client context.
+     * @param qry Query.
+     * @param args Query arguments.
+     * @return Update counters.
+     */
+    public List<Long> streamBatchedUpdateQuery(final String schemaName, final SqlClientContext cliCtx,
+        final String qry, final List<Object[]> args) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+        try {
+            return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new IgniteOutClosureX<List<Long>>() {
+                @Override public List<Long> applyx() throws IgniteCheckedException {
+                    return idx.streamBatchedUpdateQuery(schemaName, qry, args, cliCtx);
+                }
+            }, true);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * Execute distributed SQL query.
      *
      * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
new file mode 100644
index 0000000..e8c2932
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/SqlClientContext.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Container for connection properties passed by various drivers (JDBC drivers, possibly ODBC) having notion of an
+ * <b>SQL connection</b> - Ignite basically does not have one.<p>
+ * Also contains anything that a driver may need to share between threads processing queries of logically same client -
+ * see JDBC thin driver
+ */
+public class SqlClientContext implements AutoCloseable {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /** Collocated flag. */
+    private final boolean collocated;
+
+    /** Replicated caches only flag. */
+    private final boolean replicatedOnly;
+
+    /** Lazy query execution flag. */
+    private final boolean lazy;
+
+    /** Skip reducer on update flag. */
+    private final boolean skipReducerOnUpdate;
+
+    /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
+    private boolean streamAllowOverwrite;
+
+    /** Parallel ops count per node for data streamer. */
+    private int streamNodeParOps;
+
+    /** Node buffer size for data streamer. */
+    private int streamNodeBufSize;
+
+    /** Auto flush frequency for streaming. */
+    private long streamFlushTimeout;
+
+    /** Streamers for various caches. */
+    private Map<String, IgniteDataStreamer<?, ?>> streamers;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * @param ctx Kernal context.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     * @param collocated Collocated flag.
+     * @param replicatedOnly Replicated caches only flag.
+     * @param lazy Lazy query execution flag.
+     * @param skipReducerOnUpdate Skip reducer on update flag.
+     */
+    public SqlClientContext(GridKernalContext ctx, boolean distributedJoins, boolean enforceJoinOrder,
+        boolean collocated, boolean replicatedOnly, boolean lazy, boolean skipReducerOnUpdate) {
+        this.ctx = ctx;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder = enforceJoinOrder;
+        this.collocated = collocated;
+        this.replicatedOnly = replicatedOnly;
+        this.lazy = lazy;
+        this.skipReducerOnUpdate = skipReducerOnUpdate;
+
+        log = ctx.log(SqlClientContext.class.getName());
+    }
+
+    /**
+     * Turn on streaming on this client context.
+     *
+     * @param allowOverwrite Whether streaming should overwrite existing values.
+     * @param flushFreq Flush frequency for streamers.
+     * @param perNodeBufSize Per node streaming buffer size.
+     * @param perNodeParOps Per node streaming parallel operations number.
+     */
+    public void enableStreaming(boolean allowOverwrite, long flushFreq, int perNodeBufSize, int perNodeParOps) {
+        if (isStream())
+            return;
+
+        streamers = new HashMap<>();
+
+        this.streamAllowOverwrite = allowOverwrite;
+        this.streamFlushTimeout = flushFreq;
+        this.streamNodeBufSize = perNodeBufSize;
+        this.streamNodeParOps = perNodeParOps;
+    }
+
+    /**
+     * Turn off streaming on this client context - with closing all open streamers, if any.
+     */
+    public void disableStreaming() {
+        if (!isStream())
+            return;
+
+        Iterator<IgniteDataStreamer<?, ?>> it = streamers.values().iterator();
+
+        while (it.hasNext()) {
+            IgniteDataStreamer<?, ?> streamer = it.next();
+
+            U.close(streamer, log);
+
+            it.remove();
+        }
+
+        streamers = null;
+    }
+
+    /**
+     * @return Collocated flag.
+     */
+    public boolean isCollocated() {
+        return collocated;
+    }
+
+    /**
+     * @return Distributed joins flag.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
+    /**
+     * @return Enforce join order flag.
+     */
+    public boolean isEnforceJoinOrder() {
+        return enforceJoinOrder;
+    }
+
+    /**
+     * @return Replicated caches only flag.
+     */
+    public boolean isReplicatedOnly() {
+        return replicatedOnly;
+    }
+
+    /**
+     * @return Lazy query execution flag.
+     */
+    public boolean isLazy() {
+        return lazy;
+    }
+
+    /**
+     * @return Skip reducer on update flag,
+     */
+    public boolean isSkipReducerOnUpdate() {
+        return skipReducerOnUpdate;
+    }
+
+    /**
+     * @return Streaming state flag (on or off).
+     */
+    public boolean isStream() {
+        return streamers != null;
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @return Streamer for given cache.
+     */
+    public IgniteDataStreamer<?, ?> streamerForCache(String cacheName) {
+        if (streamers == null)
+            return null;
+
+        IgniteDataStreamer<?, ?> res = streamers.get(cacheName);
+
+        if (res != null)
+            return res;
+
+        res = ctx.grid().dataStreamer(cacheName);
+
+        res.autoFlushFrequency(streamFlushTimeout);
+
+        res.allowOverwrite(streamAllowOverwrite);
+
+        if (streamNodeBufSize > 0)
+            res.perNodeBufferSize(streamNodeBufSize);
+
+        if (streamNodeParOps > 0)
+            res.perNodeParallelOperations(streamNodeParOps);
+
+        streamers.put(cacheName, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        if (streamers == null)
+            return;
+
+        for (IgniteDataStreamer<?, ?> s : streamers.values())
+            U.close(s, log);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
index 9a2a865..26b2263 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlKeyword.java
@@ -17,11 +17,10 @@
 
 package org.apache.ignite.internal.sql;
 
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.util.typedef.F;
-
 import java.lang.reflect.Field;
 import java.util.HashSet;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * SQL keyword constants.
@@ -30,9 +29,15 @@ public class SqlKeyword {
     /** Keyword: ALTER. */
     public static final String ALTER = "ALTER";
 
+    /** Keyword: ALLOW_OVERWRITE */
+    public static final String ALLOW_OVERWRITE = "ALLOW_OVERWRITE";
+
     /** Keyword: ASC. */
     public static final String ASC = "ASC";
 
+    /** Keyword: BATCH_SIZE */
+    public static final String BATCH_SIZE = "BATCH_SIZE";
+
     /** Keyword: BIGINT */
     public static final String BIGINT = "BIGINT";
 
@@ -96,6 +101,9 @@ public class SqlKeyword {
     /** Keyword: FLOAT8. */
     public static final String FLOAT8 = "FLOAT8";
 
+    /** Keyword: FLUSH_FREQUENCY. */
+    public static final String FLUSH_FREQUENCY = "FLUSH_FREQUENCY";
+
     /** Keyword: FORMAT. */
     public static final String FORMAT = "FORMAT";
 
@@ -168,9 +176,18 @@ public class SqlKeyword {
     /** Keyword: NVARCHAR2. */
     public static final String NVARCHAR2 = "NVARCHAR2";
 
+    /** Keyword: OFF. */
+    public static final String OFF = "OFF";
+
     /** Keyword: ON. */
     public static final String ON = "ON";
 
+    /** Keyword: PER_NODE_PARALLEL_OPERATIONS. */
+    public static final String PER_NODE_PARALLEL_OPERATIONS = "PER_NODE_PARALLEL_OPERATIONS";
+
+    /** Keyword: PER_NODE_BUFFER_SIZE. */
+    public static final String PER_NODE_BUFFER_SIZE = "PER_NODE_BUFFER_SIZE";
+
     /** Keyword: PRECISION. */
     public static final String PRECISION = "PRECISION";
 
@@ -183,6 +200,9 @@ public class SqlKeyword {
     /** Keyword: RESTRICT. */
     public static final String RESTRICT = "RESTRICT";
 
+    /** Keyword: SET. */
+    public static final String SET = "SET";
+
     /** Keyword: SIGNED. */
     public static final String SIGNED = "SIGNED";
 
@@ -195,6 +215,9 @@ public class SqlKeyword {
     /** Keyword: SPATIAL. */
     public static final String SPATIAL = "SPATIAL";
 
+    /** Keyword: STREAMING. */
+    public static final String STREAMING = "STREAMING";
+
     /** Keyword: TABLE. */
     public static final String TABLE = "TABLE";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
index 0627def..da6d28e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParser.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.sql.SqlKeyword.ALTER;
@@ -31,7 +32,9 @@ import static org.apache.ignite.internal.sql.SqlKeyword.DROP;
 import static org.apache.ignite.internal.sql.SqlKeyword.HASH;
 import static org.apache.ignite.internal.sql.SqlKeyword.INDEX;
 import static org.apache.ignite.internal.sql.SqlKeyword.PRIMARY;
+import static org.apache.ignite.internal.sql.SqlKeyword.SET;
 import static org.apache.ignite.internal.sql.SqlKeyword.SPATIAL;
+import static org.apache.ignite.internal.sql.SqlKeyword.STREAMING;
 import static org.apache.ignite.internal.sql.SqlKeyword.TABLE;
 import static org.apache.ignite.internal.sql.SqlKeyword.UNIQUE;
 import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken;
@@ -110,6 +113,11 @@ public class SqlParser {
 
                             break;
 
+                        case SET:
+                            cmd = processSet();
+
+                            break;
+
                         case ALTER:
                             cmd = processAlter();
                     }
@@ -122,7 +130,7 @@ public class SqlParser {
                         return cmd;
                     }
                     else
-                        throw errorUnexpectedToken(lex, CREATE, DROP, COPY, ALTER);
+                        throw errorUnexpectedToken(lex, CREATE, DROP, ALTER, COPY, SET);
 
                 case QUOTED:
                 case MINUS:
@@ -137,6 +145,22 @@ public class SqlParser {
     }
 
     /**
+     * Process SET keyword.
+     *
+     * @return Command.
+     */
+    private SqlCommand processSet() {
+        if (lex.shift() && lex.tokenType() == SqlLexerTokenType.DEFAULT) {
+            switch (lex.token()) {
+                case STREAMING:
+                    return new SqlSetStreamingCommand().parse(lex);
+            }
+        }
+
+        throw errorUnexpectedToken(lex, STREAMING);
+    }
+
+    /**
      * Processes COPY command.
      *
      * @return The {@link SqlBulkLoadCommand} command.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java
index 829c48c..9ed75ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/SqlParserUtils.java
@@ -123,6 +123,28 @@ public class SqlParserUtils {
     }
 
     /**
+     * Parse boolean parameter value based on presence of tokens 1, 0, ON, OFF. Not that this is not
+     * and is not intended to be routine for parsing a boolean literal from TRUE/FALSE.
+     * @param lex Lexer.
+     * @return Boolean parameter value.
+     */
+    public static boolean parseBoolean(SqlLexer lex) {
+        if (lex.shift() && lex.tokenType() == SqlLexerTokenType.DEFAULT) {
+            switch (lex.token()) {
+                case SqlKeyword.ON:
+                case "1":
+                    return true;
+
+                case SqlKeyword.OFF:
+                case "0":
+                    return false;
+            }
+        }
+
+        throw errorUnexpectedToken(lex, SqlKeyword.ON, SqlKeyword.OFF, "1", "0");
+    }
+
+    /**
      * Process name.
      *
      * @param lex Lexer.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
new file mode 100644
index 0000000..c492c61
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/sql/command/SqlSetStreamingCommand.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.command;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.sql.SqlKeyword;
+import org.apache.ignite.internal.sql.SqlLexer;
+import org.apache.ignite.internal.sql.SqlLexerTokenType;
+import org.apache.ignite.internal.sql.SqlParseException;
+
+import static org.apache.ignite.internal.sql.SqlParserUtils.error;
+import static org.apache.ignite.internal.sql.SqlParserUtils.errorUnexpectedToken;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseBoolean;
+import static org.apache.ignite.internal.sql.SqlParserUtils.parseInt;
+
+/**
+ * SET STREAMING command.
+ */
+public class SqlSetStreamingCommand implements SqlCommand {
+    /** Default batch size for driver. */
+    private final static int DFLT_STREAM_BATCH_SIZE = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE * 4;
+
+    /** Whether streaming must be turned on or off by this command. */
+    private boolean turnOn;
+
+    /** Whether existing values should be overwritten on keys duplication. */
+    private boolean allowOverwrite;
+
+    /** Batch size for driver. */
+    private int batchSize = DFLT_STREAM_BATCH_SIZE;
+
+    /** Per node number of parallel operations. */
+    private int perNodeParOps;
+
+    /** Per node buffer size. */
+    private int perNodeBufSize;
+
+    /** Streamer flush timeout. */
+    private long flushFreq;
+
+    /** {@inheritDoc} */
+    @Override public SqlCommand parse(SqlLexer lex) {
+        turnOn = parseBoolean(lex);
+
+        while (lex.lookAhead().tokenType() == SqlLexerTokenType.DEFAULT) {
+            switch (lex.lookAhead().token()) {
+                case SqlKeyword.BATCH_SIZE:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    batchSize = parseInt(lex);
+
+                    if (batchSize <= 0)
+                        throw error(lex, "Invalid batch size (must be positive).");
+
+                    break;
+
+                case SqlKeyword.PER_NODE_BUFFER_SIZE:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    perNodeBufSize = parseInt(lex);
+
+                    if (perNodeBufSize <= 0)
+                        throw error(lex, "Invalid per node buffer size (must be positive).");
+
+                    break;
+
+                case SqlKeyword.PER_NODE_PARALLEL_OPERATIONS:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    perNodeParOps = parseInt(lex);
+
+                    if (perNodeParOps <= 0)
+                        throw error(lex, "Invalid per node parallel operations number (must be positive).");
+
+                    break;
+
+                case SqlKeyword.ALLOW_OVERWRITE:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    allowOverwrite = parseBoolean(lex);
+
+                    break;
+
+                case SqlKeyword.FLUSH_FREQUENCY:
+                    lex.shift();
+
+                    checkOffLast(lex);
+
+                    flushFreq = parseInt(lex);
+
+                    if (flushFreq <= 0)
+                        throw error(lex, "Invalid flush frequency (must be positive).");
+
+                    break;
+
+                default:
+                    return this;
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Throw an unexpected token exception if this command turns streaming off.
+     * @param lex Lexer to take unexpected token from.
+     * @throws SqlParseException if {@link #turnOn} is {@code false}.
+     */
+    private void checkOffLast(SqlLexer lex) throws SqlParseException {
+        if (!turnOn) {
+            assert lex.tokenType() == SqlLexerTokenType.DEFAULT;
+
+            throw errorUnexpectedToken(lex);
+        }
+    }
+
+    /**
+     * @return Whether streaming must be turned on or off by this command.
+     */
+    public boolean isTurnOn() {
+        return turnOn;
+    }
+
+    /**
+     * @return Whether existing values should be overwritten on keys duplication.
+     */
+    public boolean allowOverwrite() {
+        return allowOverwrite;
+    }
+
+    /**
+     * @return Batch size for driver.
+     */
+    public int batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * @return Per node number of parallel operations.
+     */
+    public int perNodeParallelOperations() {
+        return perNodeParOps;
+    }
+
+    /**
+     * @return Per node streamer buffer size.
+     */
+    public int perNodeBufferSize() {
+        return perNodeBufSize;
+    }
+
+    /**
+     * @return Streamer flush timeout
+     */
+    public long flushFrequency() {
+        return flushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String schemaName() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void schemaName(String schemaName) {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index df27c5f..6d7e9ae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -244,12 +245,18 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
 
         /** {@inheritDoc} */
         @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-            boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
+            SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
             return null;
         }
 
         /** {@inheritDoc} */
-        @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params,
+        @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+            SqlClientContext cliCtx) throws IgniteCheckedException {
+            return Collections.emptyList();
+        }
+
+        /** {@inheritDoc} */
+        @Override public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
             IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
             return 0;
         }
@@ -372,8 +379,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
-            return false;
+        @Override public void checkStatementStreamable(PreparedStatement nativeStmt) {
+            // No-op.
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
new file mode 100644
index 0000000..65bb599
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/sql/SqlParserSetStreamingSelfTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql;
+
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
+
+/**
+ * Tests for SQL parser: SET STREAMING.
+ */
+public class SqlParserSetStreamingSelfTest extends SqlParserAbstractSelfTest {
+    /**
+     *
+     */
+    public void testParseSetStreaming() {
+        parseValidate("set streaming on", true, false, 2048, 0, 0, 0);
+        parseValidate("set streaming 1", true, false, 2048, 0, 0, 0);
+        parseValidate("set streaming off", false, false, 2048, 0, 0, 0);
+        parseValidate("set streaming 0", false, false, 2048, 0, 0, 0);
+        parseValidate("set streaming on batch_size 100", true, false, 100, 0, 0, 0);
+        parseValidate("set streaming on flush_frequency 500", true, false, 2048, 0, 0, 500);
+        parseValidate("set streaming on per_node_buffer_size 100", true, false, 2048, 0, 100, 0);
+        parseValidate("set streaming on per_node_parallel_operations 4", true, false, 2048, 4, 0, 0);
+        parseValidate("set streaming on allow_overwrite on", true, true, 2048, 0, 0, 0);
+        parseValidate("set streaming on allow_overwrite off", true, false, 2048, 0, 0, 0);
+        parseValidate("set streaming on per_node_buffer_size 50 flush_frequency 500 " +
+            "per_node_parallel_operations 4 allow_overwrite on batch_size 100", true, true, 100, 4, 50, 500);
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set",
+            "Failed to parse SQL statement \"set[*]\": Unexpected end of command (expected: \"STREAMING\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming",
+            "Failed to parse SQL statement \"set streaming[*]\": Unexpected end of command (expected: " +
+                "\"ON\", \"OFF\", \"1\", \"0\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming table",
+            "Failed to parse SQL statement \"set streaming [*]table\": Unexpected token: \"TABLE\" (expected: " +
+                "\"ON\", \"OFF\", \"1\", \"0\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming -1",
+            "Failed to parse SQL statement \"set streaming [*]-1\": Unexpected token: \"-\" (expected: " +
+                "\"ON\", \"OFF\", \"1\", \"0\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming 500",
+            "Failed to parse SQL statement \"set streaming [*]500\": Unexpected token: \"500\" (expected: " +
+                "\"ON\", \"OFF\", \"1\", \"0\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on allow_overwrite",
+            "set streaming on allow_overwrite[*]\": Unexpected end of command (expected: \"ON\", \"OFF\", \"1\", " +
+                "\"0\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming 1 batch_size",
+            "Failed to parse SQL statement \"set streaming 1 batch_size[*]\": Unexpected end of command " +
+                "(expected: \"[integer]\")");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on per_node_parallel_operations -4",
+            "Failed to parse SQL statement \"set streaming on per_node_parallel_operations -[*]4\": " +
+                "Invalid per node parallel operations number (must be positive)");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on per_node_buffer_size -4",
+            "Failed to parse SQL statement \"set streaming on per_node_buffer_size -[*]4\": " +
+                "Invalid per node buffer size (must be positive)");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming on flush_frequency -4",
+            "Failed to parse SQL statement \"set streaming on flush_frequency -[*]4\": " +
+                "Invalid flush frequency (must be positive)");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off allow_overwrite",
+            "Failed to parse SQL statement \"set streaming off [*]allow_overwrite\": Unexpected token: " +
+                "\"ALLOW_OVERWRITE\"");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off batch_size",
+            "Failed to parse SQL statement \"set streaming off [*]batch_size\": Unexpected token: " +
+                "\"BATCH_SIZE\"");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off flush_frequency",
+            "Failed to parse SQL statement \"set streaming off [*]flush_frequency\": Unexpected token: " +
+                "\"FLUSH_FREQUENCY\"");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off per_node_buffer_size",
+            "Failed to parse SQL statement \"set streaming off [*]per_node_buffer_size\": Unexpected token: " +
+                "\"PER_NODE_BUFFER_SIZE\"");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off per_node_parallel_operations",
+            "Failed to parse SQL statement \"set streaming off [*]per_node_parallel_operations\": Unexpected token: " +
+                "\"PER_NODE_PARALLEL_OPERATIONS\"");
+
+        assertParseError(QueryUtils.DFLT_SCHEMA, "set streaming off table",
+            "Failed to parse SQL statement \"set streaming off [*]table\": Unexpected token: \"TABLE\"");
+    }
+
+    /**
+     * Parse and validate SQL script.
+     *
+     * @param sql SQL.
+     * @param expOn Expected on/off value.
+     * @param expAllowOverwrite Expected allow overwrite flag.
+     * @param expBatchSize Expected batch size.
+     * @param expParOps Expected per-node parallael operations.
+     * @param expBufSize Expected per node buffer size.
+     * @param expFlushFreq Expected flush frequency.
+     */
+    private static void parseValidate(String sql, boolean expOn, boolean expAllowOverwrite, int expBatchSize,
+        int expParOps, int expBufSize, long expFlushFreq) {
+        SqlSetStreamingCommand cmd = (SqlSetStreamingCommand)new SqlParser(QueryUtils.DFLT_SCHEMA, sql).nextCommand();
+
+        assertEquals(expOn, cmd.isTurnOn());
+
+        assertEquals(expAllowOverwrite, cmd.allowOverwrite());
+
+        assertEquals(expBatchSize, cmd.batchSize());
+
+        assertEquals(expParOps, cmd.perNodeParallelOperations());
+
+        assertEquals(expBufSize, cmd.perNodeBufferSize());
+
+        assertEquals(expFlushFreq, cmd.flushFrequency());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index b114828..8d2a820 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -299,7 +299,7 @@ public final class GridTestUtils {
             call.call();
         }
         catch (Throwable e) {
-            if (cls != e.getClass()) {
+            if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) {
                 if (e.getClass() == CacheException.class && e.getCause() != null && e.getCause().getClass() == cls)
                     e = e.getCause();
                 else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index ea6c7c1..62dbd50 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -43,10 +43,10 @@ import org.apache.ignite.cache.query.BulkLoadContextCursor;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
-import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadCacheWriter;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadParser;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
 import org.apache.ignite.internal.processors.bulkload.BulkLoadStreamerWriter;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -385,6 +385,7 @@ public class DmlStatementsProcessor {
     /**
      * Perform given statement against given data streamer. Only rows based INSERT is supported.
      *
+     * @param schemaName Schema name.
      * @param streamer Streamer to feed data to.
      * @param stmt Statement.
      * @param args Statement arguments.
@@ -392,81 +393,74 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args)
+    long streamUpdateQuery(String schemaName, IgniteDataStreamer streamer, PreparedStatement stmt, final Object[] args)
         throws IgniteCheckedException {
+        idx.checkStatementStreamable(stmt);
+
         Prepared p = GridSqlQueryParser.prepared(stmt);
 
         assert p != null;
 
-        final UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null);
-
-        if (!F.eq(streamer.cacheName(), plan.cacheContext().name()))
-            throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
-                " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
-
-        if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) {
-            assert plan.isLocalSubquery();
+        final UpdatePlan plan = getPlanForStatement(schemaName, null, p, null, true, null);
 
-            final GridCacheContext cctx = plan.cacheContext();
+        assert plan.isLocalSubquery();
 
-            QueryCursorImpl<List<?>> cur;
+        final GridCacheContext cctx = plan.cacheContext();
 
-            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
+        QueryCursorImpl<List<?>> cur;
 
-            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    try {
-                        Iterator<List<?>> it;
+        final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
 
-                        if (!F.isEmpty(plan.selectQuery())) {
-                            GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
-                                plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
-                                null, false, 0, null);
+        QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                try {
+                    Iterator<List<?>> it;
 
-                            it = res.iterator();
-                        }
-                        else
-                            it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
+                    if (!F.isEmpty(plan.selectQuery())) {
+                        GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()),
+                            plan.selectQuery(), F.asList(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)),
+                            null, false, 0, null);
 
-                        return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary());
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IgniteException(e);
+                        it = res.iterator();
                     }
-                }
-            }, null);
-
-            data.addAll(stepCur.getAll());
+                    else
+                        it = plan.createRows(U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY)).iterator();
 
-            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
-                @Override public Iterator<List<?>> iterator() {
-                    return data.iterator();
+                    return new GridQueryCacheObjectsIterator(it, idx.objectContext(), cctx.keepBinary());
                 }
-            }, null);
-
-            if (plan.rowCount() == 1) {
-                IgniteBiTuple t = plan.processRow(cur.iterator().next());
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, null);
 
-                streamer.addData(t.getKey(), t.getValue());
+        data.addAll(stepCur.getAll());
 
-                return 1;
+        cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+            @Override public Iterator<List<?>> iterator() {
+                return data.iterator();
             }
+        }, null);
 
-            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cur.iterator().next());
 
-            for (List<?> row : cur) {
-                final IgniteBiTuple t = plan.processRow(row);
+            streamer.addData(t.getKey(), t.getValue());
 
-                rows.put(t.getKey(), t.getValue());
-            }
+            return 1;
+        }
+
+        Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
 
-            streamer.addData(rows);
+        for (List<?> row : cur) {
+            final IgniteBiTuple t = plan.processRow(row);
 
-            return rows.size();
+            rows.put(t.getKey(), t.getValue());
         }
-        else
-            throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode",
-                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        streamer.addData(rows);
+
+        return rows.size();
     }
 
     /**
@@ -519,7 +513,7 @@ public class DmlStatementsProcessor {
                 .setPageSize(fieldsQry.getPageSize())
                 .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
 
-            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, true, true,
+            cur = (QueryCursorImpl<List<?>>)idx.querySqlFields(schemaName, newFieldsQry, null, true, true,
                 cancel).get(0);
         }
         else if (plan.hasRows())
@@ -610,7 +604,7 @@ public class DmlStatementsProcessor {
      * @return Update plan.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
+    UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
         boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
         H2CachedStatementKey planKey = H2CachedStatementKey.forDmlStatement(schema, p.getSQL(), fieldsQry, loc);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index cfbb7bb..e9d9f90 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -17,8 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.lang.reflect.Constructor;
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -33,12 +41,7 @@ import org.h2.table.IndexColumn;
 import org.h2.value.DataType;
 import org.h2.value.Value;
 
-import java.lang.reflect.Constructor;
-import java.sql.Connection;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
 
 /**
  * H2 utility methods.
@@ -267,4 +270,17 @@ public class H2Utils {
     private H2Utils() {
         // No-op.
     }
+
+    /**
+     * @return Single-column, single-row cursor with 0 as number of updated records.
+     */
+    @SuppressWarnings("unchecked")
+    public static QueryCursorImpl<List<?>> zeroCursor() {
+        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+            (Collections.singletonList(0L)), null, false);
+
+        resCur.fieldsMeta(UPDATE_RESULT_META);
+
+        return resCur;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 361623c..65f08b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
@@ -100,6 +101,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2PlainRowFactory;
@@ -120,11 +122,12 @@ import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisito
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.sql.SqlParseException;
 import org.apache.ignite.internal.sql.SqlParser;
-import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlAlterTableCommand;
+import org.apache.ignite.internal.sql.command.SqlBulkLoadCommand;
 import org.apache.ignite.internal.sql.command.SqlCommand;
 import org.apache.ignite.internal.sql.command.SqlCreateIndexCommand;
 import org.apache.ignite.internal.sql.command.SqlDropIndexCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -149,7 +152,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.Prepared;
-import org.h2.command.dml.Insert;
 import org.h2.command.dml.NoOperation;
 import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
@@ -191,7 +193,7 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
 @SuppressWarnings({"UnnecessaryFullyQualifiedName", "NonFinalStaticVariableUsedInClassInitialization"})
 public class IgniteH2Indexing implements GridQueryIndexing {
     public static final Pattern INTERNAL_CMD_RE = Pattern.compile(
-        "^(create|drop)\\s+index|^alter\\s+table|^copy", Pattern.CASE_INSENSITIVE);
+        "^(create|drop)\\s+index|^alter\\s+table|^copy|^set", Pattern.CASE_INSENSITIVE);
 
     /*
      * Register IO for indexes.
@@ -500,10 +502,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) throws SQLException {
+    @Override public PreparedStatement prepareNativeStatement(String schemaName, String sql) {
         Connection conn = connectionForSchema(schemaName);
 
-        return prepareStatement(conn, sql, true);
+        return prepareStatementAndCaches(conn, sql);
     }
 
     /**
@@ -1013,7 +1015,55 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             throw new IgniteSQLException(e);
         }
 
-        return dmlProc.streamUpdateQuery(streamer, stmt, params);
+        return dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+        SqlClientContext cliCtx) throws IgniteCheckedException {
+        if (cliCtx == null || !cliCtx.isStream()) {
+            U.warn(log, "Connection is not in streaming mode.");
+
+            return zeroBatchedStreamedUpdateResult(params.size());
+        }
+
+        final Connection conn = connectionForSchema(schemaName);
+
+        final PreparedStatement stmt = prepareStatementAndCaches(conn, qry);
+
+        if (GridSqlQueryParser.checkMultipleStatements(stmt))
+            throw new IgniteSQLException("Multiple statements queries are not supported for streaming mode.",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        checkStatementStreamable(stmt);
+
+        Prepared p = GridSqlQueryParser.prepared(stmt);
+
+        UpdatePlan plan = dmlProc.getPlanForStatement(schemaName, conn, p, null, true, null);
+
+        IgniteDataStreamer<?, ?> streamer = cliCtx.streamerForCache(plan.cacheContext().name());
+
+        assert streamer != null;
+
+        List<Long> res = new ArrayList<>(params.size());
+
+        for (int i = 0; i < params.size(); i++)
+            res.add(dmlProc.streamUpdateQuery(schemaName, streamer, stmt, params.get(i)));
+
+        return res;
+    }
+
+    /**
+     * @param size Result size.
+     * @return List of given size filled with 0Ls.
+     */
+    private static List<Long> zeroBatchedStreamedUpdateResult(int size) {
+        Long[] res = new Long[size];
+
+        Arrays.fill(res, 0);
+
+        return Arrays.asList(res);
     }
 
     /**
@@ -1399,7 +1449,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
         final QueryCursor<List<?>> res =
-            querySqlFields(schemaName, fqry, keepBinary, true, null).get(0);
+            querySqlFields(schemaName, fqry, null, keepBinary, true, null).get(0);
 
         final Iterable<Cache.Entry<K, V>> converted = new Iterable<Cache.Entry<K, V>>() {
             @Override public Iterator<Cache.Entry<K, V>> iterator() {
@@ -1435,19 +1485,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * Try executing query using native facilities.
      *
      * @param schemaName Schema name.
-     * @param qry Query.
+     * @param sql Query.
+     * @param cliCtx Client context, or {@code null} if not applicable.
      * @return Result or {@code null} if cannot parse/process this query.
      */
-    private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, SqlFieldsQuery qry) {
+    private List<FieldsQueryCursor<List<?>>> tryQueryDistributedSqlFieldsNative(String schemaName, String sql,
+        @Nullable SqlClientContext cliCtx) {
         // Heuristic check for fast return.
-        if (!INTERNAL_CMD_RE.matcher(qry.getSql().trim()).find())
+        if (!INTERNAL_CMD_RE.matcher(sql.trim()).find())
             return null;
 
         // Parse.
         SqlCommand cmd;
 
         try {
-            SqlParser parser = new SqlParser(schemaName, qry.getSql());
+            SqlParser parser = new SqlParser(schemaName, sql);
 
             cmd = parser.nextCommand();
 
@@ -1455,15 +1507,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (parser.nextCommand() != null)
                 return null;
 
-            // Currently supported commands are: CREATE/DROP INDEX/COPY/ALTER TABLE
+            // Currently supported commands are:
+            // CREATE/DROP INDEX
+            // COPY
+            // ALTER TABLE
+            // SET STREAMING
             if (!(cmd instanceof SqlCreateIndexCommand || cmd instanceof SqlDropIndexCommand ||
-                cmd instanceof SqlBulkLoadCommand || cmd instanceof SqlAlterTableCommand))
+                cmd instanceof SqlAlterTableCommand || cmd instanceof SqlBulkLoadCommand ||
+                cmd instanceof SqlSetStreamingCommand))
                 return null;
         }
         catch (Exception e) {
             // Cannot parse, return.
             if (log.isDebugEnabled())
-                log.debug("Failed to parse SQL with native parser [qry=" + qry.getSql() + ", err=" + e + ']');
+                log.debug("Failed to parse SQL with native parser [qry=" + sql + ", err=" + e + ']');
 
             if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK))
                 return null;
@@ -1473,24 +1530,40 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (e instanceof SqlParseException)
                 code = ((SqlParseException)e).code();
 
-            throw new IgniteSQLException("Failed to parse DDL statement: " + qry.getSql() + ": " + e.getMessage(),
+            throw new IgniteSQLException("Failed to parse DDL statement: " + sql + ": " + e.getMessage(),
                 code, e);
         }
 
         // Execute.
         if (cmd instanceof SqlBulkLoadCommand) {
-            FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(qry.getSql(), cmd);
+            FieldsQueryCursor<List<?>> cursor = dmlProc.runNativeDmlStatement(sql, cmd);
 
             return Collections.singletonList(cursor);
         }
+        else if (cmd instanceof SqlSetStreamingCommand) {
+            if (cliCtx == null)
+                throw new IgniteSQLException("SET STREAMING command can only be executed from JDBC or ODBC driver.");
+
+            SqlSetStreamingCommand setCmd = (SqlSetStreamingCommand)cmd;
+
+            boolean on = setCmd.isTurnOn();
+
+            if (on)
+                cliCtx.enableStreaming(setCmd.allowOverwrite(), setCmd.flushFrequency(),
+                    setCmd.perNodeBufferSize(), setCmd.perNodeParallelOperations());
+            else
+                cliCtx.disableStreaming();
+
+            return Collections.singletonList(H2Utils.zeroCursor());
+        }
         else {
             try {
-                FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(qry.getSql(), cmd);
+                FieldsQueryCursor<List<?>> cursor = ddlProc.runDdlStatement(sql, cmd);
 
                 return Collections.singletonList(cursor);
             }
             catch (IgniteCheckedException e) {
-                throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + qry.getSql() + "]: "
+                throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sql + "]: "
                     + e.getMessage(), e);
             }
         }
@@ -1512,10 +1585,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
 
     /** {@inheritDoc} */
-    @SuppressWarnings("StringEquality")
+    @SuppressWarnings({"StringEquality", "unchecked"})
     @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
-        boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
-        List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry);
+        @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel) {
+        List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry.getSql(), cliCtx);
 
         if (res != null)
             return res;
@@ -1553,8 +1626,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                 // We may use this cached statement only for local queries and non queries.
                 if (qry.isLocal() || !prepared.isQuery())
-                    return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null, null,
-                            keepBinary, cancel);
+                    return (List<FieldsQueryCursor<List<?>>>)doRunPrepared(schemaName, prepared, qry, null,
+                        null, keepBinary, cancel);
             }
         }
 
@@ -1602,12 +1675,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param twoStepQry Two-step query if this query must be executed in a distributed way.
      * @param meta Metadata for {@code twoStepQry}.
      * @param keepBinary Whether binary objects must not be deserialized automatically.
-     * @param cancel Query cancel state holder.
-     * @return Query result.
+     * @param cancel Query cancel state holder.    @return Query result.
      */
     private List<? extends FieldsQueryCursor<List<?>>> doRunPrepared(String schemaName, Prepared prepared,
-        SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry, List<GridQueryFieldMetadata> meta, boolean keepBinary,
-        GridQueryCancel cancel) {
+        SqlFieldsQuery qry, GridCacheTwoStepQuery twoStepQry,
+        List<GridQueryFieldMetadata> meta, boolean keepBinary, GridQueryCancel cancel) {
         String sqlQry = qry.getSql();
 
         boolean loc = qry.isLocal();
@@ -2276,10 +2348,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isInsertStatement(PreparedStatement nativeStmt) {
-        Prepared prep = GridSqlQueryParser.prepared(nativeStmt);
-
-        return prep instanceof Insert;
+    @Override public void checkStatementStreamable(PreparedStatement nativeStmt) {
+        if (!GridSqlQueryParser.isStreamableInsertStatement(nativeStmt))
+            throw new IgniteSQLException("Streaming mode supports only INSERT commands without subqueries.",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 6f5b51f..d4f2b0e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryEntityEx;
 import org.apache.ignite.internal.processors.query.QueryField;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlterTableAddColumn;
@@ -106,7 +107,7 @@ public class DdlStatementsProcessor {
      * @throws IgniteCheckedException On error.
      */
     @SuppressWarnings("unchecked")
-    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException{
+    public FieldsQueryCursor<List<?>> runDdlStatement(String sql, SqlCommand cmd) throws IgniteCheckedException {
         IgniteInternalFuture fut;
 
         try {
@@ -211,12 +212,7 @@ public class DdlStatementsProcessor {
             if (fut != null)
                 fut.get();
 
-            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-                (Collections.singletonList(0L)), null, false);
-
-            resCur.fieldsMeta(UPDATE_RESULT_META);
-
-            return resCur;
+            return H2Utils.zeroCursor();
         }
         catch (SchemaOperationException e) {
             throw convert(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index 10d485a..98fbb97 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -505,7 +505,7 @@ public final class UpdatePlan {
     /**
      * @return Local subquery flag.
      */
-    @Nullable public boolean isLocalSubquery() {
+    public boolean isLocalSubquery() {
         return isLocSubqry;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index bced836..d897ac7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -85,20 +85,21 @@ public final class UpdatePlanBuilder {
      * @param loc Local query flag.
      * @param idx Indexing.
      * @param conn Connection.
-     * @param fieldsQuery Original query.
+     * @param fieldsQry Original query.
      * @return Update plan.
      */
     public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx,
-        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos)
+        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQry, @Nullable Integer errKeysPos)
         throws IgniteCheckedException {
-        assert !prepared.isQuery();
-
         GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared);
 
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
-            return planForInsert(stmt, loc, idx, conn, fieldsQuery);
+            return planForInsert(stmt, loc, idx, conn, fieldsQry);
+        else if (stmt instanceof GridSqlUpdate || stmt instanceof GridSqlDelete)
+            return planForUpdate(stmt, loc, idx, conn, fieldsQry, errKeysPos);
         else
-            return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos);
+            throw new IgniteSQLException("Unsupported operation: " + prepared.getSQL(),
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index 04bc212..2d2c25c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -1993,6 +1993,18 @@ public class GridSqlQueryParser {
     }
 
     /**
+     * Check if passed statement is insert statement eligible for streaming.
+     *
+     * @param nativeStmt Native statement.
+     * @return {@code True} if streamable insert.
+     */
+    public static boolean isStreamableInsertStatement(PreparedStatement nativeStmt) {
+        Prepared prep = prepared(nativeStmt);
+
+        return prep instanceof Insert && INSERT_QUERY.get((Insert)prep) == null;
+    }
+
+    /**
      * @param cond Condition.
      * @param o Object.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index 069bdd7..cf8bb2e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -140,7 +140,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         SqlFieldsQuery qry = new SqlFieldsQuery("select f.productId, p.name, f.price " +
             "from FactPurchase f, \"replicated-prod\".DimProduct p where p.id = f.productId ");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             set1.add((Integer)o.get(0));
@@ -154,7 +154,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
 
         qry = new SqlFieldsQuery("select productId from FactPurchase group by productId");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(set0.add((Integer) o.get(0)));
@@ -173,7 +173,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
             "where p.id = f.productId " +
             "group by f.productId, p.name");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(names.add((String)o.get(0)));
@@ -190,7 +190,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
             "group by f.productId, p.name " +
             "having s >= 15");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertTrue(i(o, 1) >= 15);
@@ -203,7 +203,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         qry = new SqlFieldsQuery("select top 3 distinct productId " +
             "from FactPurchase f order by productId desc ");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertEquals(top--, o.get(0));
@@ -216,7 +216,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         qry = new SqlFieldsQuery("select distinct productId " +
             "from FactPurchase f order by productId desc limit 2 offset 1");
 
-        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, false, true).get(0).getAll()) {
+        for (List<?> o : qryProc.querySqlFields(cache.context(), qry, null, false, true).get(0).getAll()) {
             X.println("___ -> " + o);
 
             assertEquals(top--, o.get(0));
@@ -256,13 +256,13 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         GridTestUtils.assertThrows(log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    qryProc.querySqlFields(cache.context(), qry, false, true);
+                    qryProc.querySqlFields(cache.context(), qry, null, false, true);
 
                     return null;
                 }
             }, IgniteSQLException.class, "Multiple statements queries are not supported");
 
-        List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, false, false);
+        List<FieldsQueryCursor<List<?>>> cursors = qryProc.querySqlFields(cache.context(), qry, null, false, false);
 
         assertEquals(2, cursors.size());
 
@@ -274,7 +274,7 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest {
         GridTestUtils.assertThrows(log,
             new Callable<Object>() {
                 @Override public Object call() throws Exception {
-                    qryProc.querySqlFields(cache.context(), qry, false, false);
+                    qryProc.querySqlFields(cache.context(), qry, null, false, false);
 
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index f004453..c01eaa6 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -170,6 +170,7 @@ import org.apache.ignite.internal.processors.sql.SqlConnectorConfigurationValida
 import org.apache.ignite.internal.sql.SqlParserBulkLoadSelfTest;
 import org.apache.ignite.internal.sql.SqlParserCreateIndexSelfTest;
 import org.apache.ignite.internal.sql.SqlParserDropIndexSelfTest;
+import org.apache.ignite.internal.sql.SqlParserSetStreamingSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
 import org.apache.ignite.testframework.IgniteTestSuite;
 
@@ -187,6 +188,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(SqlParserCreateIndexSelfTest.class);
         suite.addTestSuite(SqlParserDropIndexSelfTest.class);
         suite.addTestSuite(SqlParserBulkLoadSelfTest.class);
+        suite.addTestSuite(SqlParserSetStreamingSelfTest.class);
 
         suite.addTestSuite(SqlConnectorConfigurationValidationSelfTest.class);
         suite.addTestSuite(ClientConnectorConfigurationValidationSelfTest.class);


Mime
View raw message