ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/2] ignite git commit: IGNITE-7253: JDBC thin driver: implemented streaming. This closes #3499. This closes #3591.
Date Wed, 14 Mar 2018 09:23:53 GMT
IGNITE-7253: JDBC thin driver: implemented streaming. This closes #3499. This closes #3591.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7366809e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7366809e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7366809e

Branch: refs/heads/master
Commit: 7366809ed0ea7a943e1c80079fdbeb556ee86245
Parents: ae2bf3d
Author: Alexander Paschenko <alexander.a.paschenko@gmail.com>
Authored: Wed Mar 14 12:23:37 2018 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Mar 14 12:23:37 2018 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcNoCacheStreamingSelfTest.java     | 182 +++++++
 .../internal/jdbc2/JdbcStreamingSelfTest.java   | 220 +++++++--
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |  11 +-
 .../jdbc/thin/JdbcThinStreamingSelfTest.java    | 486 +++++++++++++++++++
 .../internal/jdbc/thin/JdbcThinConnection.java  | 112 +++++
 .../jdbc/thin/JdbcThinPreparedStatement.java    |  22 +-
 .../internal/jdbc/thin/JdbcThinStatement.java   | 153 +++++-
 .../internal/jdbc/thin/JdbcThinTcpIo.java       |  12 +-
 .../ignite/internal/jdbc2/JdbcConnection.java   |  10 +-
 .../jdbc2/JdbcStreamedPreparedStatement.java    |   2 +-
 .../processors/cache/IgniteCacheProxyImpl.java  |   4 +-
 .../odbc/jdbc/JdbcBatchExecuteRequest.java      |  40 +-
 .../odbc/jdbc/JdbcConnectionContext.java        |  13 +-
 .../odbc/jdbc/JdbcRequestHandler.java           |  84 ++--
 .../processors/query/GridQueryIndexing.java     |  32 +-
 .../processors/query/GridQueryProcessor.java    |  50 +-
 .../processors/query/SqlClientContext.java      | 223 +++++++++
 .../apache/ignite/internal/sql/SqlKeyword.java  |  29 +-
 .../apache/ignite/internal/sql/SqlParser.java   |  26 +-
 .../ignite/internal/sql/SqlParserUtils.java     |  22 +
 .../sql/command/SqlSetStreamingCommand.java     | 191 ++++++++
 ...IgniteClientCacheInitializationFailTest.java |  15 +-
 .../sql/SqlParserSetStreamingSelfTest.java      | 134 +++++
 .../ignite/testframework/GridTestUtils.java     |   2 +-
 .../query/h2/DmlStatementsProcessor.java        | 104 ++--
 .../internal/processors/query/h2/H2Utils.java   |  28 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 134 +++--
 .../query/h2/ddl/DdlStatementsProcessor.java    |  10 +-
 .../processors/query/h2/dml/UpdatePlan.java     |   2 +-
 .../query/h2/dml/UpdatePlanBuilder.java         |  13 +-
 .../query/h2/sql/GridSqlQueryParser.java        |  12 +
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  18 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 33 files changed, 2140 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
new file mode 100644
index 0000000..74c2820
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Data streaming test for thick driver and no explicit caches.
+ */
+public class JdbcNoCacheStreamingSelfTest extends GridCommonAbstractTest {
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Connection. */
+    protected Connection conn;
+
+    /** */
+    protected transient IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return getConfiguration0(gridName);
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Grid configuration used for starting the grid.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration getConfiguration0(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Integer.class, Integer.class
+        );
+
+        cfg.setCacheConfiguration(cache);
+        cfg.setLocalHost("127.0.0.1");
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+        ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param allowOverwrite Allow overwriting of existing keys.
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    protected Connection createConnection(boolean allowOverwrite) throws Exception {
+        Properties props = new Properties();
+
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+
+        if (allowOverwrite)
+            props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true");
+
+        return DriverManager.getConnection(BASE_URL, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.closeQuiet(conn);
+
+        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsert() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+
+        try (Connection conn = createConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, i);
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 != 0)
+                assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+            else // All that divides by 10 evenly should point to numbers 100 times greater - see above
+                assertEquals(i * 100, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsertWithOverwritesAllowed() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+
+        try (Connection conn = createConnection(true)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, i);
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        // i should point to i at all times as we've turned overwrites on above.
+        for (int i = 1; i <= 100; i++)
+            assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
index 5418ca0..10adedc 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -20,16 +20,24 @@ package org.apache.ignite.internal.jdbc2;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.Properties;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteJdbcDriver;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
@@ -41,10 +49,12 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
     /** JDBC URL. */
-    private static final String BASE_URL = CFG_URL_PREFIX + "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
 
-    /** Connection. */
-    protected Connection conn;
+    /** Streaming URL. */
+    private static final String STREAMING_URL = CFG_URL_PREFIX +
+        "cache=person@modules/clients/src/test/config/jdbc-config.xml";
 
     /** */
     protected transient IgniteLogger log;
@@ -90,7 +100,18 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
         startGrids(2);
+
+        try (Connection c = createOrdinaryConnection()) {
+            try (Statement s = c.createStatement()) {
+                s.execute("CREATE TABLE PUBLIC.Person(\"id\" int primary key, \"name\" varchar) WITH " +
+                    "\"cache_name=person,value_type=Person\"");
+            }
+        }
+
+        U.sleep(1000);
     }
 
     /** {@inheritDoc} */
@@ -99,27 +120,51 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @return Connection without streaming initially turned on.
+     * @throws SQLException if failed.
+     */
+    protected Connection createOrdinaryConnection() throws SQLException {
+        Connection res = DriverManager.getConnection(BASE_URL, new Properties());
+
+        res.setSchema(QueryUtils.DFLT_SCHEMA);
+
+        return res;
+    }
+
+    /**
      * @param allowOverwrite Allow overwriting of existing keys.
      * @return Connection to use for the test.
      * @throws Exception if failed.
      */
-    private Connection createConnection(boolean allowOverwrite) throws Exception {
+    protected Connection createStreamedConnection(boolean allowOverwrite) throws Exception {
+        return createStreamedConnection(allowOverwrite, 500);
+    }
+
+    /**
+     * @param allowOverwrite Allow overwriting of existing keys.
+     * @param flushTimeout Stream flush timeout.
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    protected Connection createStreamedConnection(boolean allowOverwrite, long flushTimeout) throws Exception {
         Properties props = new Properties();
 
         props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
-        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, String.valueOf(flushTimeout));
 
         if (allowOverwrite)
             props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true");
 
-        return DriverManager.getConnection(BASE_URL, props);
+        Connection res = DriverManager.getConnection(STREAMING_URL, props);
+
+        res.setSchema(QueryUtils.DFLT_SCHEMA);
+
+        return res;
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        U.closeQuiet(conn);
-
-        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+        cache().clear();
 
         super.afterTest();
     }
@@ -128,30 +173,59 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testStreamedInsert() throws Exception {
-        conn = createConnection(false);
-
         for (int i = 10; i <= 100; i += 10)
-            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
 
-        PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
+        U.sleep(500);
 
+        // Now let's check it's all there.
         for (int i = 1; i <= 100; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i);
+            if (i % 10 != 0)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+            else // All that divides by 10 evenly should point to numbers 100 times greater - see above
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
+        }
+    }
 
-            stmt.executeUpdate();
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsertWithoutColumnsList() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
         }
 
-        // Closing connection makes it wait for streamer close
-        // and thus for data load completion as well
-        conn.close();
+        U.sleep(500);
 
         // Now let's check it's all there.
         for (int i = 1; i <= 100; i++) {
             if (i % 10 != 0)
-                assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+                assertEquals(nameForId(i), nameForIdInCache(i));
             else // All that divides by 10 evenly should point to numbers 100 times greater - see above
-                assertEquals(i * 100, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
         }
     }
 
@@ -159,27 +233,101 @@ public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testStreamedInsertWithOverwritesAllowed() throws Exception {
-        conn = createConnection(true);
-
         for (int i = 10; i <= 100; i += 10)
-            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
-
-        PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
-
-        for (int i = 1; i <= 100; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i);
-
-            stmt.executeUpdate();
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(true)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
         }
 
-        // Closing connection makes it wait for streamer close
-        // and thus for data load completion as well
-        conn.close();
+        U.sleep(500);
 
         // Now let's check it's all there.
         // i should point to i at all times as we've turned overwrites on above.
         for (int i = 1; i <= 100; i++)
-            assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+            assertEquals(nameForId(i), nameForIdInCache(i));
+    }
+
+    /** */
+    public void testOnlyInsertsAllowed() {
+        assertStatementForbidden("CREATE TABLE PUBLIC.X (x int primary key, y int)");
+
+        assertStatementForbidden("CREATE INDEX idx_1 ON Person(name)");
+
+        assertStatementForbidden("SELECT * from Person");
+
+        assertStatementForbidden("insert into PUBLIC.Person(\"id\", \"name\") " +
+            "(select \"id\" + 1, CONCAT(\"name\", '1') from Person)");
+
+        assertStatementForbidden("DELETE from Person");
+
+        assertStatementForbidden("UPDATE Person SET \"name\" = 'name0'");
+
+        assertStatementForbidden("alter table Person add column y int");
+    }
+
+    /**
+     * @param sql Statement to check.
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    protected void assertStatementForbidden(String sql) {
+        GridTestUtils.assertThrows(null, new IgniteCallable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection c = createStreamedConnection(false)) {
+                    try (PreparedStatement s = c.prepareStatement(sql)) {
+                        s.execute();
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class,"Streaming mode supports only INSERT commands without subqueries.");
+    }
+
+    /**
+     * @return Person cache.
+     */
+    protected IgniteCache<Integer, Object> cache() {
+        return grid(0).cache("person");
+    }
+
+    /**
+     * @param id id of person to put.
+     * @param name name of person to put.
+     */
+    protected void put(int id, String name) {
+        BinaryObjectBuilder bldr = grid(0).binary().builder("Person");
+
+        bldr.setField("name", name);
+
+        cache().put(id, bldr.build());
+    }
+
+    /**
+     * @param id Person id.
+     * @return Default name for person w/given id.
+     */
+    protected String nameForId(int id) {
+        return "Person" + id;
+    }
+
+    /**
+     * @param id person id.
+     * @return Name for person with given id currently stored in cache.
+     */
+    protected String nameForIdInCache(int id) {
+        Object o = cache().withKeepBinary().get(id);
+
+        assertTrue(String.valueOf(o), o instanceof BinaryObject);
+
+        return ((BinaryObject)o).field("name");
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 4530ae7..f3a94ce 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -42,6 +42,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedNearS
 import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalReplicatedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinConnectionSSLTest;
 import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -55,8 +56,10 @@ import org.apache.ignite.jdbc.thin.JdbcThinDynamicIndexTransactionalReplicatedSe
 import org.apache.ignite.jdbc.thin.JdbcThinEmptyCacheSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinErrorsSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSkipReducerOnUpdateSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinLocalQueriesSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinMergeStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinMergeStatementSkipReducerOnUpdateSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinMetadataSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinMissingLongArrayResultsTest;
 import org.apache.ignite.jdbc.thin.JdbcThinNoDefaultSchemaTest;
@@ -65,11 +68,9 @@ import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSkipReducerOnUpdateSelfTest;
-import org.apache.ignite.jdbc.thin.JdbcThinMergeStatementSkipReducerOnUpdateSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinTcpIoTest;
+import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSkipReducerOnUpdateSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinWalModeChangeSelfTest;
 
@@ -121,9 +122,11 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcErrorsSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingToPublicCacheTest.class));
+        suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcNoCacheStreamingSelfTest.class));
 
         suite.addTest(new TestSuite(JdbcBlobTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class));
 
         // DDL tests.
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
new file mode 100644
index 0000000..3c36f54
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests for streaming via thin driver.
+ */
+public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest {
+    /** */
+    private int batchSize = 17;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        GridQueryProcessor.idxCls = IndexingWithContext.class;
+
+        super.beforeTestsStarted();
+
+        batchSize = 17;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        try (Connection c = createOrdinaryConnection()) {
+            execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
+        }
+
+        IndexingWithContext.cliCtx = null;
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Connection createStreamedConnection(boolean allowOverwrite, long flushFreq) throws Exception {
+        Connection c = JdbcThinAbstractSelfTest.connect(grid(0), null       );
+
+        execute(c, "SET STREAMING 1 BATCH_SIZE " + batchSize + " ALLOW_OVERWRITE " + (allowOverwrite ? 1 : 0) +
+            " PER_NODE_BUFFER_SIZE 1000 FLUSH_FREQUENCY " + flushFreq);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Connection createOrdinaryConnection() throws SQLException {
+        return JdbcThinAbstractSelfTest.connect(grid(0), null);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedBatchedInsert() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?), " +
+                "(?, ?)")) {
+                for (int i = 1; i <= 100; i+= 2) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+                    stmt.setInt(3, i + 1);
+                    stmt.setString(4, nameForId(i + 1));
+
+                    stmt.addBatch();
+                }
+
+                stmt.executeBatch();
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 != 0)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+            else // All that divides by 10 evenly should point to numbers 100 times greater - see above
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testSimultaneousStreaming() throws Exception {
+        try (Connection anotherConn = createOrdinaryConnection()) {
+            execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y int) WITH " +
+                "\"cache_name=T,wrap_value=false\"");
+        }
+
+        // Timeout to let connection close be handled on server side.
+        U.sleep(500);
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            PreparedStatement firstStmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)");
+
+            PreparedStatement secondStmt = conn.prepareStatement("insert into PUBLIC.T(x, y) values (?, ?)");
+
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 51; i <= 67; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                for (int i = 11; i <= 50; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 68; i <= 100; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                assertCacheEmpty();
+
+                SqlClientContext cliCtx = sqlClientContext();
+
+                HashMap<String, IgniteDataStreamer<?, ?>> streamers = U.field(cliCtx, "streamers");
+
+                assertEquals(2, streamers.size());
+
+                assertEqualsCollections(new HashSet<>(Arrays.asList("person", "T")), streamers.keySet());
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 50; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+
+        for (int i = 51; i <= 100; i++)
+            assertEquals(i, grid(0).cache("T").get(i));
+    }
+
+    /**
+     *
+     */
+    public void testStreamingWithMixedStatementTypes() throws Exception {
+        String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, ?)";
+
+        String stmtStr = "insert into Person(\"id\", \"name\") values (%d, '%s')";
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
+
+            Statement secondStmt = conn.createStatement();
+
+            try {
+                for (int i = 1; i <= 100; i++) {
+                    boolean usePrep = Math.random() > 0.5;
+
+                    boolean useBatch = Math.random() > 0.5;
+
+                    if (usePrep) {
+                        firstStmt.setInt(1, i);
+                        firstStmt.setString(2, nameForId(i));
+
+                        if (useBatch)
+                            firstStmt.addBatch();
+                        else
+                            firstStmt.execute();
+                    }
+                    else {
+                        String sql = String.format(stmtStr, i, nameForId(i));
+
+                        if (useBatch)
+                            secondStmt.addBatch(sql);
+                        else
+                            secondStmt.execute(sql);
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingOffToOn() throws SQLException {
+        try (Connection conn = createOrdinaryConnection()) {
+            assertStreamingState(false);
+
+            execute(conn, "SET STREAMING 1");
+
+            assertStreamingState(true);
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingOnToOff() throws Exception {
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingState(true);
+
+            execute(conn, "SET STREAMING off");
+
+            assertStreamingState(false);
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testFlush() throws Exception {
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+
+            assertCacheEmpty();
+
+            execute(conn, "set streaming 0");
+
+            assertStreamingState(false);
+
+            U.sleep(500);
+
+            // Now let's check it's all there.
+            for (int i = 1; i <= 100; i++)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testStreamingReEnabled() throws Exception {
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingState(true);
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into Person(\"id\", \"name\") values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+
+            assertCacheEmpty();
+
+            execute(conn, "set streaming 1 batch_size 111 allow_overwrite 0 per_node_buffer_size 512 " +
+                "per_node_parallel_operations 4 flush_frequency 5000");
+
+            U.sleep(500);
+
+            assertEquals((Integer)111, U.field(conn, "streamBatchSize"));
+
+            SqlClientContext cliCtx = sqlClientContext();
+
+            assertTrue(cliCtx.isStream());
+
+            assertFalse(U.field(cliCtx, "streamAllowOverwrite"));
+
+            assertEquals((Integer)512, U.field(cliCtx, "streamNodeBufSize"));
+
+            assertEquals((Long)5000L, U.field(cliCtx, "streamFlushTimeout"));
+
+            assertEquals((Integer)4, U.field(cliCtx, "streamNodeParOps"));
+
+            // Now let's check it's all there - SET STREAMING 1 repeated call must also have caused flush.
+            for (int i = 1; i <= 100; i++)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testNonStreamedBatch() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (Statement s = conn.createStatement()) {
+                        for (int i = 1; i <= 10; i++)
+                            s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", i,
+                                nameForId(i)));
+
+                        execute(conn, "SET STREAMING 1");
+
+                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 11,
+                            nameForId(11)));
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Statement has non-empty batch (call executeBatch() or clearBatch() before " +
+            "enabling streaming).");
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testStreamingStatementInTheMiddleOfNonPreparedBatch() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (Statement s = conn.createStatement()) {
+                        s.addBatch(String.format("insert into Person(\"id\", \"name\")values (%d, '%s')", 1,
+                            nameForId(1)));
+
+                        s.addBatch("SET STREAMING 1 FLUSH_FREQUENCY 10000");
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Streaming control commands must be executed explicitly");
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    public void testBatchingSetStreamingStatement() {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection conn = createOrdinaryConnection()) {
+                    try (PreparedStatement s = conn.prepareStatement("SET STREAMING 1 FLUSH_FREQUENCY 10000")) {
+                        s.addBatch();
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class, "Streaming control commands must be executed explicitly");
+    }
+
+    /**
+     * Check that there's nothing in cache.
+     */
+    private void assertCacheEmpty() {
+        assertEquals(0, cache().size(CachePeekMode.ALL));
+    }
+
+    /**
+     * @param conn Connection.
+     * @param sql Statement.
+     * @throws SQLException if failed.
+     */
+    private static void execute(Connection conn, String sql) throws SQLException {
+        try (Statement s = conn.createStatement()) {
+            s.execute(sql);
+        }
+    }
+
+    /**
+     * @return Active SQL client context.
+     */
+    private SqlClientContext sqlClientContext() {
+        assertNotNull(IndexingWithContext.cliCtx);
+
+        return IndexingWithContext.cliCtx;
+    }
+
+    /**
+     * Check that streaming state on target node is as expected.
+     * @param on Expected streaming state.
+     */
+    private void assertStreamingState(boolean on) {
+        SqlClientContext cliCtx = sqlClientContext();
+
+        assertEquals(on, cliCtx.isStream());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void assertStatementForbidden(String sql) {
+        batchSize = 1;
+
+        super.assertStatementForbidden(sql);
+    }
+
+    /**
+     *
+     */
+    private static final class IndexingWithContext extends IgniteH2Indexing {
+        /** Client context. */
+        static SqlClientContext cliCtx;
+
+        /** {@inheritDoc} */
+        @Override public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+            SqlClientContext cliCtx) throws IgniteCheckedException {
+            IndexingWithContext.cliCtx = cliCtx;
+
+            return super.streamBatchedUpdateQuery(schemaName, qry, params, cliCtx);
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
+            @Nullable SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts,
+            GridQueryCancel cancel) {
+            IndexingWithContext.cliCtx = cliCtx;
+
+            return super.querySqlFields(schemaName, qry, cliCtx, keepBinary, failOnMultipleStmts, cancel);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index ac9925d..779a022 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.jdbc.thin;
 
 import java.sql.Array;
+import java.sql.BatchUpdateException;
 import java.sql.Blob;
 import java.sql.CallableStatement;
 import java.sql.Clob;
@@ -33,17 +34,27 @@ import java.sql.SQLXML;
 import java.sql.Savepoint;
 import java.sql.Statement;
 import java.sql.Struct;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.sql.command.SqlCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteProductVersion;
 
@@ -79,6 +90,9 @@ public class JdbcThinConnection implements Connection {
     /** Read-only flag. */
     private boolean readOnly;
 
+    /** Streaming flag. */
+    private volatile boolean stream;
+
     /** Current transaction holdability. */
     private int holdability;
 
@@ -94,6 +108,15 @@ public class JdbcThinConnection implements Connection {
     /** Connection properties. */
     private ConnectionProperties connProps;
 
+    /** Batch size for streaming. */
+    private int streamBatchSize;
+
+    /** Batch for streaming. */
+    private List<JdbcQuery> streamBatch;
+
+    /** Last added query to recognize batches. */
+    private String lastStreamQry;
+
     /**
      * Creates new connection.
      *
@@ -135,6 +158,86 @@ public class JdbcThinConnection implements Connection {
         }
     }
 
+    /**
+     * @return Whether this connection is streamed or not.
+     */
+    boolean isStream() {
+        return stream;
+    }
+
+    /**
+     * @param sql Statement.
+     * @param cmd Parsed form of {@code sql}.
+     * @throws SQLException if failed.
+     */
+    void executeNative(String sql, SqlCommand cmd) throws SQLException {
+        if (cmd instanceof SqlSetStreamingCommand) {
+            // If streaming is already on, we have to disable it first.
+            if (stream) {
+                // We have to send request regardless of actual batch size.
+                executeBatch(true);
+
+                stream = false;
+            }
+
+            boolean newVal = ((SqlSetStreamingCommand)cmd).isTurnOn();
+
+            // Actual ON, if needed.
+            if (newVal) {
+                sendRequest(new JdbcQueryExecuteRequest(JdbcStatementType.ANY_STATEMENT_TYPE,
+                    schema, 1, 1, sql, null));
+
+                streamBatchSize = ((SqlSetStreamingCommand)cmd).batchSize();
+
+                stream = true;
+            }
+        }
+        else
+            throw IgniteQueryErrorCode.createJdbcSqlException("Unsupported native statement: " + sql,
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+    }
+
+    /**
+     * Add another query for batched execution.
+     * @param sql Query.
+     * @param args Arguments.
+     */
+    void addBatch(String sql, List<Object> args) throws SQLException {
+        boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
+
+        // Providing null as SQL here allows for recognizing subbatches on server and handling them more efficiently.
+        JdbcQuery q  = new JdbcQuery(newQry ? sql : null, args != null ? args.toArray() : null);
+
+        if (streamBatch == null)
+            streamBatch = new ArrayList<>(streamBatchSize);
+
+        streamBatch.add(q);
+
+        // Null args means "addBatch(String)" was called on non-prepared Statement,
+        // we don't want to remember its query string.
+        lastStreamQry = (args != null ? sql : null);
+
+        if (streamBatch.size() == streamBatchSize)
+            executeBatch(false);
+    }
+
+    /**
+     * @param lastBatch Whether open data streamers must be flushed and closed after this batch.
+     * @throws SQLException if failed.
+     */
+    private void executeBatch(boolean lastBatch) throws SQLException {
+        JdbcBatchExecuteResult res = sendRequest(new JdbcBatchExecuteRequest(schema, streamBatch, lastBatch));
+
+        streamBatch = null;
+
+        lastStreamQry = null;
+
+        if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
+            throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
+                res.errorCode(), res.updateCounts());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public Statement createStatement() throws SQLException {
         return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
@@ -277,6 +380,15 @@ public class JdbcThinConnection implements Connection {
         if (isClosed())
             return;
 
+        if (!F.isEmpty(streamBatch)) {
+            try {
+                executeBatch(true);
+            }
+            catch (SQLException e) {
+                LOG.log(Level.WARNING, "Exception during batch send on streamed connection close", e);
+            }
+        }
+
         closed = true;
 
         cliIo.close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index 23d3bbe..d18788c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -39,8 +39,8 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
-import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
@@ -262,13 +262,23 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep
     @Override public void addBatch() throws SQLException {
         ensureNotClosed();
 
-        if (batch == null) {
-            batch = new ArrayList<>();
+        checkStatementEligibleForBatching(sql);
+
+        checkStatementBatchEmpty();
+
+        batchSize++;
+
+        if (conn.isStream())
+            conn.addBatch(sql, args);
+        else {
+            if (batch == null) {
+                batch = new ArrayList<>();
 
-            batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()])));
+                batch.add(new JdbcQuery(sql, args.toArray(new Object[args.size()])));
+            }
+            else
+                batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()])));
         }
-        else
-            batch.add(new JdbcQuery(null, args.toArray(new Object[args.size()])));
 
         args = null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 9c41804..480edfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -38,14 +38,21 @@ import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadAckResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.sql.SqlKeyword;
+import org.apache.ignite.internal.sql.SqlParseException;
+import org.apache.ignite.internal.sql.SqlParser;
+import org.apache.ignite.internal.sql.command.SqlCommand;
+import org.apache.ignite.internal.sql.command.SqlSetStreamingCommand;
+import org.apache.ignite.internal.util.typedef.F;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.FETCH_FORWARD;
@@ -79,6 +86,9 @@ public class JdbcThinStatement implements Statement {
     /** Result set  holdability*/
     private final int resHoldability;
 
+    /** Batch size to keep track of number of items to return as fake update counters for executeBatch. */
+    protected int batchSize;
+
     /** Batch. */
     protected List<JdbcQuery> batch;
 
@@ -119,6 +129,40 @@ public class JdbcThinStatement implements Statement {
     }
 
     /**
+     * @param sql Query.
+     * @return Native {@link SqlCommand}, or {@code null} if parsing was not successful.
+     */
+    private SqlCommand tryParseNative(String sql) {
+        try {
+            return new SqlParser(schema, sql).nextCommand();
+        }
+        catch (SqlParseException e) {
+            return null;
+        }
+    }
+
+    /**
+     * @param sql Query.
+     * @return Whether it's worth trying to parse this statement on the client.
+     */
+    private static boolean isEligibleForNativeParsing(String sql) {
+        if (F.isEmpty(sql))
+            return false;
+
+        sql = sql.toUpperCase();
+
+        int setPos = sql.indexOf(SqlKeyword.SET);
+
+        if (setPos == -1)
+            return false;
+
+        int streamingPos = sql.indexOf(SqlKeyword.STREAMING);
+
+        // There must be at least one symbol between SET and STREAMING.
+        return streamingPos - setPos - SqlKeyword.SET.length() >= 1;
+    }
+
+    /**
      * @param stmtType Expected statement type.
      * @param sql Sql query.
      * @param args Query parameters.
@@ -133,6 +177,36 @@ public class JdbcThinStatement implements Statement {
         if (sql == null || sql.isEmpty())
             throw new SQLException("SQL query is empty.");
 
+        checkStatementBatchEmpty();
+
+        SqlCommand nativeCmd = null;
+
+        if (stmtType != JdbcStatementType.SELECT_STATEMENT_TYPE && isEligibleForNativeParsing(sql))
+            nativeCmd = tryParseNative(sql);
+
+        if (nativeCmd != null) {
+            conn.executeNative(sql, nativeCmd);
+
+            resultSets = Collections.singletonList(resultSetForUpdate(0));
+
+            // If this command should be executed as native one, we do not treat it
+            // as an ordinary batch citizen.
+            return;
+        }
+
+        if (conn.isStream()) {
+            if (stmtType == JdbcStatementType.SELECT_STATEMENT_TYPE)
+                throw new SQLException("executeQuery() method is not allowed in streaming mode.",
+                    SqlStateCode.INTERNAL_ERROR,
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+            conn.addBatch(sql, args);
+
+            resultSets = Collections.singletonList(resultSetForUpdate(0));
+
+            return;
+        }
+
         JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, schema, pageSize,
             maxRows, sql, args == null ? null : args.toArray(new Object[args.size()])));
 
@@ -158,11 +232,8 @@ public class JdbcThinStatement implements Statement {
             boolean firstRes = true;
 
             for(JdbcResultInfo rsInfo : resInfos) {
-                if (!rsInfo.isQuery()) {
-                    resultSets.add(new JdbcThinResultSet(this, -1, pageSize,
-                        true, Collections.<List<Object>>emptyList(), false,
-                        conn.autoCloseServerCursor(), rsInfo.updateCount(), closeOnCompletion));
-                }
+                if (!rsInfo.isQuery())
+                    resultSets.add(resultSetForUpdate(rsInfo.updateCount()));
                 else {
                     if (firstRes) {
                         firstRes = false;
@@ -186,6 +257,26 @@ public class JdbcThinStatement implements Statement {
     }
 
     /**
+     * Check that user has not added anything to this statement's batch prior to turning streaming on.
+     * @throws SQLException if failed.
+     */
+    void checkStatementBatchEmpty() throws SQLException {
+        if (conn.isStream() && !F.isEmpty(batch))
+            throw new IgniteSQLException("Statement has non-empty batch (call executeBatch() or clearBatch() " +
+                "before enabling streaming).", IgniteQueryErrorCode.UNSUPPORTED_OPERATION).toJdbcException();
+    }
+
+    /**
+     * @param cnt Update counter.
+     * @return Result set for given update counter.
+     */
+    private JdbcThinResultSet resultSetForUpdate(long cnt) {
+        return new JdbcThinResultSet(this, -1, pageSize,
+            true, Collections.<List<Object>>emptyList(), false,
+            conn.autoCloseServerCursor(), cnt, closeOnCompletion);
+    }
+
+    /**
      * Sends a file to server in batches via multiple {@link JdbcBulkLoadBatchRequest}s.
      *
      * @param cmdRes Result of invoking COPY command: contains server-parsed
@@ -469,16 +560,50 @@ public class JdbcThinStatement implements Statement {
     @Override public void addBatch(String sql) throws SQLException {
         ensureNotClosed();
 
+        checkStatementEligibleForBatching(sql);
+
+        checkStatementBatchEmpty();
+
+        batchSize++;
+
+        if (conn.isStream()) {
+            conn.addBatch(sql, null);
+
+            return;
+        }
+
         if (batch == null)
             batch = new ArrayList<>();
 
         batch.add(new JdbcQuery(sql, null));
     }
 
+    /**
+     * Check that we're not trying to add to connection's batch a native command (it should be executed explicitly).
+     * @param sql SQL command.
+     * @throws SQLException if there's an attempt to add a native command to JDBC batch.
+     */
+    void checkStatementEligibleForBatching(String sql) throws SQLException {
+        SqlCommand nativeCmd = null;
+
+        if (isEligibleForNativeParsing(sql))
+            nativeCmd = tryParseNative(sql);
+
+        if (nativeCmd != null) {
+            assert nativeCmd instanceof SqlSetStreamingCommand;
+
+            throw new SQLException("Streaming control commands must be executed explicitly - " +
+                "either via Statement.execute(String), or via using prepared statements.",
+                SqlStateCode.UNSUPPORTED_OPERATION);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void clearBatch() throws SQLException {
         ensureNotClosed();
 
+        batchSize = 0;
+
         batch = null;
     }
 
@@ -488,11 +613,21 @@ public class JdbcThinStatement implements Statement {
 
         closeResults();
 
-        if (batch == null || batch.isEmpty())
+        checkStatementBatchEmpty();
+
+        if (conn.isStream()) {
+            int[] res = new int[batchSize];
+
+            batchSize = 0;
+
+            return res;
+        }
+
+        if (F.isEmpty(batch))
             throw new SQLException("Batch is empty.");
 
         try {
-            JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch));
+            JdbcBatchExecuteResult res = conn.sendRequest(new JdbcBatchExecuteRequest(conn.getSchema(), batch, false));
 
             if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
                 throw new BatchUpdateException(res.errorMessage(), IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
@@ -502,6 +637,8 @@ public class JdbcThinStatement implements Statement {
             return res.updateCounts();
         }
         finally {
+            batchSize = 0;
+
             batch = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 7aa6c33..ba7258a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -37,12 +37,14 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryCloseRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryFetchRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryMetadataRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
 import org.apache.ignite.internal.util.ipc.loopback.IpcClientTcpEndpoint;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 
@@ -298,7 +300,8 @@ public class JdbcThinTcpIo {
 
             ClientListenerProtocolVersion srvProtocolVer = ClientListenerProtocolVersion.create(maj, min, maintenance);
 
-            if (VER_2_3_0.equals(srvProtocolVer) || VER_2_1_5.equals(srvProtocolVer))
+            if (VER_2_4_0.equals(srvProtocolVer) || VER_2_3_0.equals(srvProtocolVer) ||
+                VER_2_1_5.equals(srvProtocolVer))
                 handshake(srvProtocolVer);
             else if (VER_2_1_0.equals(srvProtocolVer))
                 handshake_2_1_0();
@@ -391,9 +394,12 @@ public class JdbcThinTcpIo {
         int cap;
 
         if (req instanceof JdbcBatchExecuteRequest) {
-            int cnt = Math.min(MAX_BATCH_QRY_CNT, ((JdbcBatchExecuteRequest)req).queries().size());
+            List<JdbcQuery> qrys = ((JdbcBatchExecuteRequest)req).queries();
 
-            cap = cnt * DYNAMIC_SIZE_MSG_CAP;
+            int cnt = !F.isEmpty(qrys) ? Math.min(MAX_BATCH_QRY_CNT, qrys.size()) : 0;
+
+            // One additional byte for last batch flag.
+            cap = cnt * DYNAMIC_SIZE_MSG_CAP + 1;
         }
         else if (req instanceof JdbcQueryCloseRequest)
             cap = QUERY_CLOSE_MSG_SIZE;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index b51e0b9..f61ccf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -612,10 +613,11 @@ public class JdbcConnection implements Connection {
 
             PreparedStatement nativeStmt = prepareNativeStatement(sql);
 
-            if (!idx.isInsertStatement(nativeStmt)) {
-                throw new SQLException("Only INSERT operations are supported in streaming mode",
-                    SqlStateCode.INTERNAL_ERROR,
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            try {
+                idx.checkStatementStreamable(nativeStmt);
+            }
+            catch (IgniteSQLException e) {
+                throw e.toJdbcException();
             }
 
             IgniteDataStreamer streamer = ignite().dataStreamer(cacheName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
index 408f089..25f55f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -55,7 +55,7 @@ class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
 
     /** {@inheritDoc} */
     @Override protected void execute0(String sql, Boolean isQuery) throws SQLException {
-        assert isQuery != null && !isQuery;
+        assert isQuery == null || !isQuery;
 
         long updCnt = conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), conn.schemaName(),
             streamer, sql, getArgs());

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index a834022..c5d68b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -629,7 +629,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
             boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
 
-            return ctx.kernalContext().query().querySqlFields(ctx, qry, keepBinary, false);
+            return ctx.kernalContext().query().querySqlFields(ctx, qry, null, keepBinary, false);
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -662,7 +662,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
             if (qry instanceof SqlFieldsQuery)
                 return (FieldsQueryCursor<R>)ctx.kernalContext().query().querySqlFields(ctx, (SqlFieldsQuery)qry,
-                    keepBinary, true).get(0);
+                    null, keepBinary, true).get(0);
 
             if (qry instanceof ScanQuery)
                 return query((ScanQuery)qry, null, projection(qry.isLocal()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
index 25e1049..73fd04f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcBatchExecuteRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.odbc.jdbc;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.binary.BinaryObjectException;
@@ -39,6 +40,12 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
     private List<JdbcQuery> queries;
 
     /**
+     * Last stream batch flag - whether open streamers on current connection
+     * must be flushed and closed after this batch.
+     */
+    private boolean lastStreamBatch;
+
+    /**
      * Default constructor.
      */
     public JdbcBatchExecuteRequest() {
@@ -49,13 +56,14 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
      * @param schemaName Schema name.
      * @param queries Queries.
      */
-    public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries) {
+    public JdbcBatchExecuteRequest(String schemaName, List<JdbcQuery> queries, boolean lastStreamBatch) {
         super(BATCH_EXEC);
 
-        assert !F.isEmpty(queries);
+        assert lastStreamBatch || !F.isEmpty(queries);
 
         this.schemaName = schemaName;
         this.queries = queries;
+        this.lastStreamBatch = lastStreamBatch;
     }
 
     /**
@@ -72,15 +80,29 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
         return queries;
     }
 
+    /**
+     * @return Last stream batch flag.
+     */
+    public boolean isLastStreamBatch() {
+        return lastStreamBatch;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
         super.writeBinary(writer);
 
         writer.writeString(schemaName);
-        writer.writeInt(queries.size());
 
-        for (JdbcQuery q : queries)
-            q.writeBinary(writer);
+        if (!F.isEmpty(queries)) {
+            writer.writeInt(queries.size());
+
+            for (JdbcQuery q : queries)
+                q.writeBinary(writer);
+        }
+        else
+            writer.writeInt(0);
+
+        writer.writeBoolean(lastStreamBatch);
     }
 
     /** {@inheritDoc} */
@@ -100,6 +122,14 @@ public class JdbcBatchExecuteRequest extends JdbcRequest {
 
             queries.add(qry);
         }
+
+        try {
+            if (reader.available() > 0)
+                lastStreamBatch = reader.readBoolean();
+        }
+        catch (IOException e) {
+            throw new BinaryObjectException(e);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 5841a4d..d8f82f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 
 /**
- * ODBC Connection Context.
+ * JDBC Connection Context.
  */
 public class JdbcConnectionContext implements ClientListenerConnectionContext {
     /** Version 2.1.0. */
@@ -38,13 +38,16 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
     private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5);
 
     /** Version 2.3.1: added "multiple statements query" feature. */
-    public static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0);
+    static final ClientListenerProtocolVersion VER_2_3_0 = ClientListenerProtocolVersion.create(2, 3, 0);
 
     /** Version 2.4.0: adds default values for columns feature. */
-    public static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0);
+    static final ClientListenerProtocolVersion VER_2_4_0 = ClientListenerProtocolVersion.create(2, 4, 0);
+
+    /** Version 2.5.0. */
+    private static final ClientListenerProtocolVersion VER_2_5_0 = ClientListenerProtocolVersion.create(2, 5, 0);
 
     /** Current version. */
-    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_4_0;
+    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_5_0;
 
     /** Supported versions. */
     private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
@@ -66,6 +69,8 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext {
 
     static {
         SUPPORTED_VERS.add(CURRENT_VER);
+        SUPPORTED_VERS.add(VER_2_5_0);
+        SUPPORTED_VERS.add(VER_2_4_0);
         SUPPORTED_VERS.add(VER_2_3_0);
         SUPPORTED_VERS.add(VER_2_1_5);
         SUPPORTED_VERS.add(VER_2_1_0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 59fc06b..baf4ec5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -88,6 +89,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
     /** Kernel context. */
     private final GridKernalContext ctx;
 
+    /** Client context. */
+    private final SqlClientContext cliCtx;
+
     /** Logger. */
     private final IgniteLogger log;
 
@@ -103,24 +107,6 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
     /** Current bulk load processors. */
     private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> bulkLoadRequests = new ConcurrentHashMap<>();
 
-    /** Distributed joins flag. */
-    private final boolean distributedJoins;
-
-    /** Enforce join order flag. */
-    private final boolean enforceJoinOrder;
-
-    /** Collocated flag. */
-    private final boolean collocated;
-
-    /** Replicated only flag. */
-    private final boolean replicatedOnly;
-
-    /** Lazy query execution flag. */
-    private final boolean lazy;
-
-    /** Skip reducer on update flag. */
-    private final boolean skipReducerOnUpdate;
-
     /** Automatic close of cursors. */
     private final boolean autoCloseCursors;
 
@@ -129,7 +115,6 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
     /**
      * Constructor.
-     *
      * @param ctx Context.
      * @param busyLock Shutdown latch.
      * @param maxCursors Maximum allowed cursors.
@@ -147,15 +132,20 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate,
         ClientListenerProtocolVersion protocolVer) {
         this.ctx = ctx;
+
+        this.cliCtx = new SqlClientContext(
+            ctx,
+            distributedJoins,
+            enforceJoinOrder,
+            collocated,
+            replicatedOnly,
+            lazy,
+            skipReducerOnUpdate
+        );
+
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
-        this.distributedJoins = distributedJoins;
-        this.enforceJoinOrder = enforceJoinOrder;
-        this.collocated = collocated;
-        this.replicatedOnly = replicatedOnly;
         this.autoCloseCursors = autoCloseCursors;
-        this.lazy = lazy;
-        this.skipReducerOnUpdate = skipReducerOnUpdate;
         this.protocolVer = protocolVer;
 
         log = ctx.log(getClass());
@@ -301,6 +291,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                 }
 
                 bulkLoadRequests.clear();
+
+                U.close(cliCtx, log);
             }
             finally {
                 busyLock.leaveBusy();
@@ -326,6 +318,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
         long qryId = QRY_ID_GEN.getAndIncrement();
 
+        assert !cliCtx.isStream();
+
         try {
             String sql = req.sqlQuery();
 
@@ -347,17 +341,17 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
                     qry = new SqlFieldsQueryEx(sql, false);
 
-                    if (skipReducerOnUpdate)
+                    if (cliCtx.isSkipReducerOnUpdate())
                         ((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
             }
 
             qry.setArgs(req.arguments());
 
-            qry.setDistributedJoins(distributedJoins);
-            qry.setEnforceJoinOrder(enforceJoinOrder);
-            qry.setCollocated(collocated);
-            qry.setReplicatedOnly(replicatedOnly);
-            qry.setLazy(lazy);
+            qry.setDistributedJoins(cliCtx.isDistributedJoins());
+            qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
+            qry.setCollocated(cliCtx.isCollocated());
+            qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
+            qry.setLazy(cliCtx.isLazy());
 
             if (req.pageSize() <= 0)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid fetch size: " + req.pageSize());
@@ -371,7 +365,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
             qry.setSchema(schemaName);
 
-            List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(qry, true,
+            List<FieldsQueryCursor<List<?>>> results = ctx.query().querySqlFields(null, qry, cliCtx, true,
                 protocolVer.compareTo(VER_2_3_0) < 0);
 
             FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
@@ -569,11 +563,11 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
 
                 qry = new SqlFieldsQueryEx(q.sql(), false);
 
-                qry.setDistributedJoins(distributedJoins);
-                qry.setEnforceJoinOrder(enforceJoinOrder);
-                qry.setCollocated(collocated);
-                qry.setReplicatedOnly(replicatedOnly);
-                qry.setLazy(lazy);
+                qry.setDistributedJoins(cliCtx.isDistributedJoins());
+                qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
+                qry.setCollocated(cliCtx.isCollocated());
+                qry.setReplicatedOnly(cliCtx.isReplicatedOnly());
+                qry.setLazy(cliCtx.isLazy());
 
                 qry.setSchema(schemaName);
             }
@@ -586,6 +580,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         if (qry != null)
             executeBatchedQuery(qry, updCntsAcc, firstErr);
 
+        if (req.isLastStreamBatch())
+            cliCtx.disableStreaming();
+
         int updCnts[] = U.toIntArray(updCntsAcc);
 
         if (firstErr.isEmpty())
@@ -601,10 +598,21 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
      * @param updCntsAcc Per query rows updates counter.
      * @param firstErr First error data - code and message.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> updCntsAcc,
         IgniteBiTuple<Integer, String> firstErr) {
         try {
-            List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(qry, true, true);
+            if (cliCtx.isStream()) {
+                List<Long> cnt = ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
+                    qry.batchedArguments());
+
+                for (int i = 0; i < cnt.size(); i++)
+                    updCntsAcc.add(cnt.get(i).intValue());
+
+                return;
+            }
+
+            List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFields(null, qry, cliCtx, true, true);
 
             for (FieldsQueryCursor<List<?>> cur : qryRes) {
                 if (cur instanceof BulkLoadContextCursor)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7366809e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 6b425a1..dedd075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -76,33 +76,46 @@ public interface GridQueryIndexing {
      * Detect whether SQL query should be executed in distributed or local manner and execute it.
      * @param schemaName Schema name.
      * @param qry Query.
+     * @param cliCtx Client context.
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts Whether an exception should be thrown for multiple statements query.
-     * @param cancel Query cancel state handler.
-     * @return Cursor.
+     * @param cancel Query cancel state handler.    @return Cursor.
      */
-    public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry, boolean keepBinary,
-        boolean failOnMultipleStmts, GridQueryCancel cancel);
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, SqlFieldsQuery qry,
+        SqlClientContext cliCtx, boolean keepBinary, boolean failOnMultipleStmts, GridQueryCancel cancel);
 
     /**
-     * Perform a MERGE statement using data streamer as receiver.
+     * Execute an INSERT statement using data streamer as receiver.
      *
      * @param schemaName Schema name.
      * @param qry Query.
      * @param params Query parameters.
      * @param streamer Data streamer to feed data to.
-     * @return Query result.
+     * @return Update counter.
      * @throws IgniteCheckedException If failed.
      */
     public long streamUpdateQuery(String schemaName, String qry, @Nullable Object[] params,
         IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
 
     /**
+     * Execute a batched INSERT statement using data streamer as receiver.
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @param params Query parameters.
+     * @param cliCtx Client connection context.
+     * @return Update counters.
+     * @throws IgniteCheckedException If failed.
+     */
+    public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, List<Object[]> params,
+        SqlClientContext cliCtx) throws IgniteCheckedException;
+
+    /**
      * Executes regular query.
      *
      * @param schemaName Schema name.
      * @param cacheName Cache name.
-     *@param qry Query.
+     * @param qry Query.
      * @param filter Cache name and key filter.
      * @param keepBinary Keep binary flag.    @return Cursor.
      */
@@ -313,12 +326,11 @@ public interface GridQueryIndexing {
     public String schema(String cacheName);
 
     /**
-     * Check if passed statement is insert statemtn.
+     * Check if passed statement is insert statement eligible for streaming, throw an {@link IgniteSQLException} if not.
      *
      * @param nativeStmt Native statement.
-     * @return {@code True} if insert.
      */
-    public boolean isInsertStatement(PreparedStatement nativeStmt);
+    public void checkStatementStreamable(PreparedStatement nativeStmt);
 
     /**
      * Return row cache cleaner.


Mime
View raw message