ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [04/50] [abbrv] ignite git commit: IGNITE-4169: SQL: implemented streaming support for INSERT operations. This closes #1350. This closes #1553.
Date Wed, 01 Mar 2017 14:32:47 GMT
IGNITE-4169: SQL: implemented streaming support for INSERT operations. This closes #1350. This closes #1553.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0130b097
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0130b097
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0130b097

Branch: refs/heads/master
Commit: 0130b097b0a24950f369cc5e009a9394c3fd4e1f
Parents: 1a08ef7
Author: Alexander Paschenko <alexander.a.paschenko@gmail.com>
Authored: Mon Feb 20 13:32:19 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Feb 20 13:32:19 2017 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcAbstractDmlStatementSelfTest.java |  49 +----
 .../jdbc2/JdbcInsertStatementSelfTest.java      |  51 +++++
 .../jdbc2/JdbcMergeStatementSelfTest.java       |  51 +++++
 .../internal/jdbc2/JdbcStreamingSelfTest.java   | 189 ++++++++++++++++
 .../jdbc2/JdbcUpdateStatementSelfTest.java      |  50 +++++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   1 +
 .../org/apache/ignite/IgniteJdbcDriver.java     |  30 +++
 .../ignite/internal/jdbc2/JdbcConnection.java   |  72 +++++-
 .../internal/jdbc2/JdbcPreparedStatement.java   |  34 ++-
 .../ignite/internal/jdbc2/JdbcStatement.java    |  20 +-
 .../jdbc2/JdbcStreamedPreparedStatement.java    |  59 +++++
 .../processors/query/GridQueryIndexing.java     |  35 +++
 .../processors/query/GridQueryProcessor.java    |  63 +++++-
 .../query/h2/DmlStatementsProcessor.java        | 219 +++++++++++++------
 .../processors/query/h2/IgniteH2Indexing.java   |  57 ++++-
 15 files changed, 812 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
index 4a97aef..332bbba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.jdbc2;
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
 import java.util.Collections;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.QueryEntity;
@@ -54,7 +52,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac
     static final String BASE_URL_BIN = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-bin-config.xml";
 
     /** SQL SELECT query for verification. */
-    private static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person";
+    static final String SQL_SELECT = "select _key, id, firstName, lastName, age from Person";
 
     /** Connection. */
     protected Connection conn;
@@ -149,51 +147,6 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        try (Statement selStmt = conn.createStatement()) {
-            assert selStmt.execute(SQL_SELECT);
-
-            ResultSet rs = selStmt.getResultSet();
-
-            assert rs != null;
-
-            while (rs.next()) {
-                int id = rs.getInt("id");
-
-                switch (id) {
-                    case 1:
-                        assertEquals("p1", rs.getString("_key"));
-                        assertEquals("John", rs.getString("firstName"));
-                        assertEquals("White", rs.getString("lastName"));
-                        assertEquals(25, rs.getInt("age"));
-                        break;
-
-                    case 2:
-                        assertEquals("p2", rs.getString("_key"));
-                        assertEquals("Joe", rs.getString("firstName"));
-                        assertEquals("Black", rs.getString("lastName"));
-                        assertEquals(35, rs.getInt("age"));
-                        break;
-
-                    case 3:
-                        assertEquals("p3", rs.getString("_key"));
-                        assertEquals("Mike", rs.getString("firstName"));
-                        assertEquals("Green", rs.getString("lastName"));
-                        assertEquals(40, rs.getInt("age"));
-                        break;
-
-                    case 4:
-                        assertEquals("p4", rs.getString("_key"));
-                        assertEquals("Leah", rs.getString("firstName"));
-                        assertEquals("Grey", rs.getString("lastName"));
-                        assertEquals(22, rs.getInt("age"));
-                        break;
-
-                    default:
-                        assert false : "Invalid ID: " + id;
-                }
-            }
-        }
-
         grid(0).cache(null).clear();
 
         assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
index 7fc92de..1bd6d34 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.jdbc2;
 
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -61,6 +63,55 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        try (Statement selStmt = conn.createStatement()) {
+            assertTrue(selStmt.execute(SQL_SELECT));
+
+            ResultSet rs = selStmt.getResultSet();
+
+            assert rs != null;
+
+            while (rs.next()) {
+                int id = rs.getInt("id");
+
+                switch (id) {
+                    case 1:
+                        assertEquals("p1", rs.getString("_key"));
+                        assertEquals("John", rs.getString("firstName"));
+                        assertEquals("White", rs.getString("lastName"));
+                        assertEquals(25, rs.getInt("age"));
+                        break;
+
+                    case 2:
+                        assertEquals("p2", rs.getString("_key"));
+                        assertEquals("Joe", rs.getString("firstName"));
+                        assertEquals("Black", rs.getString("lastName"));
+                        assertEquals(35, rs.getInt("age"));
+                        break;
+
+                    case 3:
+                        assertEquals("p3", rs.getString("_key"));
+                        assertEquals("Mike", rs.getString("firstName"));
+                        assertEquals("Green", rs.getString("lastName"));
+                        assertEquals(40, rs.getInt("age"));
+                        break;
+
+                    case 4:
+                        assertEquals("p4", rs.getString("_key"));
+                        assertEquals("Leah", rs.getString("firstName"));
+                        assertEquals("Grey", rs.getString("lastName"));
+                        assertEquals(22, rs.getInt("age"));
+                        break;
+
+                    default:
+                        assert false : "Invalid ID: " + id;
+                }
+            }
+        }
+
+        grid(0).cache(null).clear();
+
+        assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));
+
         super.afterTest();
 
         if (stmt != null && !stmt.isClosed())

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
index ecf6032..3c56c92 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java
@@ -18,8 +18,10 @@
 package org.apache.ignite.internal.jdbc2;
 
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.ignite.cache.CachePeekMode;
 
 /**
  * MERGE statement test.
@@ -56,6 +58,55 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        try (Statement selStmt = conn.createStatement()) {
+            assertTrue(selStmt.execute(SQL_SELECT));
+
+            ResultSet rs = selStmt.getResultSet();
+
+            assert rs != null;
+
+            while (rs.next()) {
+                int id = rs.getInt("id");
+
+                switch (id) {
+                    case 1:
+                        assertEquals("p1", rs.getString("_key"));
+                        assertEquals("John", rs.getString("firstName"));
+                        assertEquals("White", rs.getString("lastName"));
+                        assertEquals(25, rs.getInt("age"));
+                        break;
+
+                    case 2:
+                        assertEquals("p2", rs.getString("_key"));
+                        assertEquals("Joe", rs.getString("firstName"));
+                        assertEquals("Black", rs.getString("lastName"));
+                        assertEquals(35, rs.getInt("age"));
+                        break;
+
+                    case 3:
+                        assertEquals("p3", rs.getString("_key"));
+                        assertEquals("Mike", rs.getString("firstName"));
+                        assertEquals("Green", rs.getString("lastName"));
+                        assertEquals(40, rs.getInt("age"));
+                        break;
+
+                    case 4:
+                        assertEquals("p4", rs.getString("_key"));
+                        assertEquals("Leah", rs.getString("firstName"));
+                        assertEquals("Grey", rs.getString("lastName"));
+                        assertEquals(22, rs.getInt("age"));
+                        break;
+
+                    default:
+                        assert false : "Invalid ID: " + id;
+                }
+            }
+        }
+
+        grid(0).cache(null).clear();
+
+        assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL));
+
         super.afterTest();
 
         if (stmt != null && !stmt.isClosed())

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
new file mode 100644
index 0000000..5e206ee
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Data streaming test.
+ */
+public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Connection. */
+    protected Connection conn;
+
+    /** */
+    protected transient IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        return getConfiguration0(gridName);
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Grid configuration used for starting the grid.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration getConfiguration0(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Integer.class, Integer.class
+        );
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(3);
+
+        Class.forName("org.apache.ignite.IgniteJdbcDriver");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param allowOverwrite Allow overwriting of existing keys.
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    private Connection createConnection(boolean allowOverwrite) throws Exception {
+        Properties props = new Properties();
+
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+
+        if (allowOverwrite)
+            props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, "true");
+
+        return DriverManager.getConnection(BASE_URL, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.closeQuiet(conn);
+
+        ignite(0).cache(null).clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsert() throws Exception {
+        conn = createConnection(false);
+
+        ignite(0).cache(null).put(5, 500);
+
+        PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
+
+        for (int i = 1; i <= 100000; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i);
+
+            stmt.executeUpdate();
+        }
+
+        // Data is not there yet.
+        assertNull(grid(0).cache(null).get(100000));
+
+        // Let the stream flush.
+        U.sleep(1500);
+
+        // Now let's check it's all there.
+        assertEquals(1, grid(0).cache(null).get(1));
+        assertEquals(100000, grid(0).cache(null).get(100000));
+
+        // 5 should still point to 500.
+        assertEquals(500, grid(0).cache(null).get(5));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsertWithOverwritesAllowed() throws Exception {
+        conn = createConnection(true);
+
+        ignite(0).cache(null).put(5, 500);
+
+        PreparedStatement stmt = conn.prepareStatement("insert into Integer(_key, _val) values (?, ?)");
+
+        for (int i = 1; i <= 100000; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i);
+
+            stmt.executeUpdate();
+        }
+
+        // Data is not there yet.
+        assertNull(grid(0).cache(null).get(100000));
+
+        // Let the stream flush.
+        U.sleep(1500);
+
+        // Now let's check it's all there.
+        assertEquals(1, grid(0).cache(null).get(1));
+        assertEquals(100000, grid(0).cache(null).get(100000));
+
+        // 5 should now point to 5 as we've turned overwriting on.
+        assertEquals(5, grid(0).cache(null).get(5));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
new file mode 100644
index 0000000..8ae0e90
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ *
+ */
+public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelfTest {
+    /**
+     *
+     */
+    public void testExecute() throws SQLException {
+        conn.createStatement().execute("update Person set firstName = 'Jack' where " +
+            "cast(substring(_key, 2, 1) as int) % 2 = 0");
+
+        assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
+            jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
+    }
+
+    /**
+     *
+     */
+    public void testExecuteUpdate() throws SQLException {
+        conn.createStatement().executeUpdate("update Person set firstName = 'Jack' where " +
+                "cast(substring(_key, 2, 1) as int) % 2 = 0");
+
+        assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")),
+                jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index c41b754..7395fcb 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -66,6 +66,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class));
+        suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
index d432c1e4..9790b8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcDriver.java
@@ -292,6 +292,21 @@ public class IgniteJdbcDriver implements Driver {
     /** Distributed joins parameter name. */
     private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins";
 
+    /** DML streaming parameter name. */
+    private static final String PARAM_STREAMING = "streaming";
+
+    /** DML streaming auto flush frequency. */
+    private static final String PARAM_STREAMING_FLUSH_FREQ = "streamingFlushFrequency";
+
+    /** DML streaming node buffer size. */
+    private static final String PARAM_STREAMING_PER_NODE_BUF_SIZE = "streamingPerNodeBufferSize";
+
+    /** DML streaming parallel operations per node. */
+    private static final String PARAM_STREAMING_PER_NODE_PAR_OPS = "streamingPerNodeParallelOperations";
+
+    /** Whether DML streaming will overwrite existing cache entries. */
+     private static final String PARAM_STREAMING_ALLOW_OVERWRITE = "streamingAllowOverwrite";
+
     /** Hostname property name. */
     public static final String PROP_HOST = PROP_PREFIX + "host";
 
@@ -313,6 +328,21 @@ public class IgniteJdbcDriver implements Driver {
     /** Distributed joins property name. */
     public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
 
+    /** DML streaming property name. */
+    public static final String PROP_STREAMING = PROP_PREFIX + PARAM_STREAMING;
+
+    /** DML stream auto flush frequency property name. */
+    public static final String PROP_STREAMING_FLUSH_FREQ = PROP_PREFIX + PARAM_STREAMING_FLUSH_FREQ;
+
+    /** DML stream node buffer size property name. */
+    public static final String PROP_STREAMING_PER_NODE_BUF_SIZE = PROP_PREFIX + PARAM_STREAMING_PER_NODE_BUF_SIZE;
+
+    /** DML stream parallel operations per node property name. */
+    public static final String PROP_STREAMING_PER_NODE_PAR_OPS = PROP_PREFIX + PARAM_STREAMING_PER_NODE_PAR_OPS;
+
+    /** Whether DML streaming will overwrite existing cache entries. */
+    public static final String PROP_STREAMING_ALLOW_OVERWRITE = PROP_PREFIX + PARAM_STREAMING_ALLOW_OVERWRITE;
+
     /** Cache name property name. */
     public static final String PROP_CFG = PROP_PREFIX + "cfg";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 5c4a147..4244602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -48,17 +48,21 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteJdbcDriver;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.compute.ComputeTaskTimeoutException;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
@@ -73,6 +77,11 @@ import static org.apache.ignite.IgniteJdbcDriver.PROP_COLLOCATED;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_DISTRIBUTED_JOINS;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_LOCAL;
 import static org.apache.ignite.IgniteJdbcDriver.PROP_NODE_ID;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_BUF_SIZE;
+import static org.apache.ignite.IgniteJdbcDriver.PROP_STREAMING_PER_NODE_PAR_OPS;
 
 /**
  * JDBC connection implementation.
@@ -118,6 +127,21 @@ public class JdbcConnection implements Connection {
     /** Distributed joins flag. */
     private boolean distributedJoins;
 
+    /** Make this connection streaming oriented, and prepared statements - data streamer aware. */
+    private final boolean stream;
+
+    /** Auto flush frequency for streaming. */
+    private final long streamFlushTimeout;
+
+    /** Node buffer size for data streamer. */
+    private final int streamNodeBufSize;
+
+    /** Parallel ops count per node for data streamer. */
+    private final int streamNodeParOps;
+
+    /** Allow overwrites for duplicate keys on streamed {@code INSERT}s. */
+    private final boolean streamAllowOverwrite;
+
     /** Statements. */
     final Set<JdbcStatement> statements = new HashSet<>();
 
@@ -139,6 +163,14 @@ public class JdbcConnection implements Connection {
         this.collocatedQry = Boolean.parseBoolean(props.getProperty(PROP_COLLOCATED));
         this.distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS));
 
+        stream = Boolean.parseBoolean(props.getProperty(PROP_STREAMING));
+        streamAllowOverwrite = Boolean.parseBoolean(props.getProperty(PROP_STREAMING_ALLOW_OVERWRITE));
+        streamFlushTimeout = Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0"));
+        streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE,
+            String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE)));
+        streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS,
+            String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS)));
+
         String nodeIdProp = props.getProperty(PROP_NODE_ID);
 
         if (nodeIdProp != null)
@@ -291,6 +323,14 @@ public class JdbcConnection implements Connection {
 
         closed = true;
 
+        for (Iterator<JdbcStatement> it = statements.iterator(); it.hasNext();) {
+            JdbcStatement stmt = it.next();
+
+            stmt.closeInternal();
+
+            it.remove();
+        }
+
         IgniteNodeFuture fut = NODES.get(cfg);
 
         if (fut != null && fut.release()) {
@@ -299,14 +339,6 @@ public class JdbcConnection implements Connection {
             if (ignite != null)
                 ignite.close();
         }
-
-        for (Iterator<JdbcStatement> it = statements.iterator(); it.hasNext();) {
-            JdbcStatement stmt = it.next();
-
-            stmt.closeInternal();
-
-            it.remove();
-        }
     }
 
     /** {@inheritDoc} */
@@ -487,7 +519,18 @@ public class JdbcConnection implements Connection {
         if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT)
             throw new SQLFeatureNotSupportedException("Invalid holdability (transactions are not supported).");
 
-        JdbcPreparedStatement stmt = new JdbcPreparedStatement(this, sql);
+        JdbcPreparedStatement stmt;
+
+        if (!stream)
+            stmt = new JdbcPreparedStatement(this, sql);
+        else {
+            PreparedStatement nativeStmt = prepareNativeStatement(sql);
+
+            IgniteDataStreamer<?, ?> streamer = ((IgniteEx) ignite).context().query().createStreamer(cacheName,
+                nativeStmt, streamFlushTimeout, streamNodeBufSize, streamNodeParOps, streamAllowOverwrite);
+
+            stmt = new JdbcStreamedPreparedStatement(this, sql, streamer, nativeStmt);
+        }
 
         statements.add(stmt);
 
@@ -646,12 +689,17 @@ public class JdbcConnection implements Connection {
 
     /** {@inheritDoc} */
     @Override public void setSchema(String schema) throws SQLException {
-        cacheName = schema;
+        assert ignite instanceof IgniteEx;
+
+        cacheName = ((IgniteEx)ignite).context().query().space(schema);
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public String getSchema() throws SQLException {
-        return cacheName;
+        String sqlSchema = ignite.cache(cacheName).getConfiguration(CacheConfiguration.class).getSqlSchema();
+
+        return U.firstNotNull(sqlSchema, cacheName, "");
     }
 
     /** {@inheritDoc} */
@@ -749,7 +797,7 @@ public class JdbcConnection implements Connection {
      */
     PreparedStatement prepareNativeStatement(String sql) throws SQLException {
         return ((IgniteCacheProxy) ignite().cache(cacheName())).context()
-            .kernalContext().query().prepareNativeStatement(cacheName(), sql);
+            .kernalContext().query().prepareNativeStatement(getSchema(), sql);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index 57badd2..54e58e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -17,12 +17,28 @@
 
 package org.apache.ignite.internal.jdbc2;
 
-import java.io.*;
-import java.math.*;
-import java.net.*;
-import java.sql.*;
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
 import java.sql.Date;
-import java.util.*;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
 
 /**
  * JDBC prepared statement implementation.
@@ -31,10 +47,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     /** SQL query. */
     private final String sql;
 
-    /**
-     * H2's parsed statement to retrieve metadata from.
-     */
-    private PreparedStatement nativeStatement;
+    /** H2's parsed statement to retrieve metadata from. */
+    PreparedStatement nativeStatement;
 
     /**
      * Creates new prepared statement.
@@ -55,8 +69,6 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
         throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
     }
 
-
-
     /** {@inheritDoc} */
     @Override public ResultSet executeQuery() throws SQLException {
         ensureNotClosed();

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index d7e387f..44db375 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -140,7 +140,7 @@ public class JdbcStatement implements Statement {
 
         updateCnt = -1;
 
-        return doUpdate(sql, getArgs());
+        return Long.valueOf(doUpdate(sql, getArgs())).intValue();
     }
 
     /**
@@ -148,9 +148,9 @@ public class JdbcStatement implements Statement {
      * @param sql SQL query.
      * @param args Update arguments.
      * @return Number of affected items.
-     * @throws SQLException
+     * @throws SQLException If failed.
      */
-    int doUpdate(String sql, Object[] args) throws SQLException {
+    long doUpdate(String sql, Object[] args) throws SQLException {
         if (F.isEmpty(sql))
             throw new SQLException("SQL query is empty");
 
@@ -172,11 +172,7 @@ public class JdbcStatement implements Statement {
             JdbcQueryTaskV2.QueryResult qryRes =
                 loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
 
-            Long res = updateCounterFromQueryResult(qryRes.getRows());
-
-            updateCnt = res;
-
-            return res.intValue();
+            return updateCnt = updateCounterFromQueryResult(qryRes.getRows());
         }
         catch (IgniteSQLException e) {
             throw e.toJdbcException();
@@ -194,12 +190,12 @@ public class JdbcStatement implements Statement {
      * @return update counter, if found
      * @throws SQLException if getting an update counter from result proved to be impossible.
      */
-    private static Long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
+    private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
          if (F.isEmpty(rows))
-            return 0L;
+            return -1;
 
         if (rows.size() != 1)
-            throw new SQLException("Expected number of rows of 1 for update operation");
+            throw new SQLException("Expected fetch size of 1 for update operation");
 
         List<?> row = rows.get(0);
 
@@ -211,7 +207,7 @@ public class JdbcStatement implements Statement {
         if (!(objRes instanceof Long))
             throw new SQLException("Unexpected update result type");
 
-        return (Long) objRes;
+        return (Long)objRes;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
new file mode 100644
index 0000000..019923f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.internal.IgniteEx;
+
+/**
+ * Prepared statement associated with a data streamer.
+ */
+class JdbcStreamedPreparedStatement extends JdbcPreparedStatement {
+    /** */
+    private final IgniteDataStreamer<?, ?> streamer;
+
+    /**
+     * Creates new prepared statement.
+     *
+     * @param conn Connection.
+     * @param sql  SQL query.
+     * @param streamer Data streamer to use with this statement. Will be closed on statement close.
+     */
+    JdbcStreamedPreparedStatement(JdbcConnection conn, String sql, IgniteDataStreamer<?, ?> streamer,
+        PreparedStatement nativeStmt) {
+        super(conn, sql);
+
+        this.streamer = streamer;
+
+        nativeStatement = nativeStmt;
+    }
+
+    /** {@inheritDoc} */
+    @Override void closeInternal() throws SQLException {
+        streamer.close(false);
+
+        super.closeInternal();
+    }
+
+    /** {@inheritDoc} */
+    @Override long doUpdate(String sql, Object[] args) throws SQLException {
+        return ((IgniteEx)conn.ignite()).context().query().streamUpdateQuery(conn.cacheName(), streamer, sql, args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..2abb3a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
@@ -99,6 +100,19 @@ public interface GridQueryIndexing {
         GridQueryCancel cancel) throws IgniteCheckedException;
 
     /**
+     * Perform a MERGE statement using data streamer as receiver.
+     *
+     * @param spaceName Space name.
+     * @param qry Query.
+     * @param params Query parameters.
+     * @param streamer Data streamer to feed data to.
+     * @return Query result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long streamUpdateQuery(@Nullable final String spaceName, final String qry,
+         @Nullable final Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
+
+    /**
      * Executes regular query.
      *
      * @param spaceName Space name.
@@ -241,6 +255,14 @@ public interface GridQueryIndexing {
     public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException;
 
     /**
+     * Gets space name from database schema.
+     *
+     * @param schemaName Schema name. Could not be null. Could be empty.
+     * @return Space name. Could be null.
+     */
+    public String space(String schemaName);
+
+    /**
      * Collect queries that already running more than specified duration.
      *
      * @param duration Duration to check.
@@ -259,4 +281,17 @@ public interface GridQueryIndexing {
      * Cancels all executing queries.
      */
     public void cancelAllQueries();
+
+    /**
+     * @param spaceName Space name.
+     * @param nativeStmt Native statement.
+     * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
+     * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
+     * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
+     * @param allowOverwrite Overwrite existing cache values on key duplication.
+     * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata;
+     * {@code null} if given statement is a query.
+     */
+    public IgniteDataStreamer<?,?> createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq,
+        int nodeBufSize, int nodeParOps, boolean allowOverwrite);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..c6d8270 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -17,12 +17,11 @@
 
 package org.apache.ignite.internal.processors.query;
 
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.TimeUnit;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
@@ -40,9 +39,11 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryField;
 import org.apache.ignite.binary.BinaryObject;
@@ -818,6 +819,36 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param spaceName Cache name.
+     * @param streamer Data streamer.
+     * @param qry Query.
+     * @return Iterator.
+     */
+    public long streamUpdateQuery(@Nullable final String spaceName,
+        final IgniteDataStreamer<?, ?> streamer, final String qry, final Object[] args) {
+        assert streamer != null;
+
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is stopping).");
+
+        try {
+            GridCacheContext cctx = ctx.cache().cache(spaceName).context();
+
+            return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, cctx, new IgniteOutClosureX<Long>() {
+                @Override public Long applyx() throws IgniteCheckedException {
+                    return idx.streamUpdateQuery(spaceName, qry, args, streamer);
+                }
+            }, true);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -964,7 +995,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     *
      * @param schema Schema.
      * @param sql Query.
      * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
@@ -976,6 +1006,31 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param schema Schema name.
+     * @return space (cache) name from schema name.
+     */
+    public String space(String schema) throws SQLException {
+        checkxEnabled();
+
+        return idx.space(schema);
+    }
+
+    /**
+     * @param spaceName Space name.
+     * @param nativeStmt Native statement.
+     * @param autoFlushFreq Automatic data flushing frequency, disabled if {@code 0}.
+     * @param nodeBufSize Per node buffer size - see {@link IgniteDataStreamer#perNodeBufferSize(int)}
+     * @param nodeParOps Per node parallel ops count - see {@link IgniteDataStreamer#perNodeParallelOperations(int)}
+     * @param allowOverwrite Overwrite existing cache values on key duplication.
+     * @see IgniteDataStreamer#allowOverwrite
+     * @return {@link IgniteDataStreamer} tailored to specific needs of given native statement based on its metadata.
+     */
+    public IgniteDataStreamer<?, ?> createStreamer(String spaceName, PreparedStatement nativeStmt, long autoFlushFreq,
+        int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
+        return idx.createStreamer(spaceName, nativeStmt, autoFlushFreq, nodeBufSize, nodeParOps, allowOverwrite);
+    }
+
+    /**
      * @param timeout Timeout.
      * @param timeUnit Time unit.
      * @return Converted time.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 4030758..78c5bbc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Array;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -40,6 +39,7 @@ import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryArrayIdentityResolver;
 import org.apache.ignite.binary.BinaryObject;
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
 import org.apache.ignite.internal.processors.query.h2.dml.KeyValueSupplier;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -126,7 +127,7 @@ public class DmlStatementsProcessor {
      * @return Update result (modified items count and failed keys).
      * @throws IgniteCheckedException if failed.
      */
-    private long updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+    private UpdateResult updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
         boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
         Object[] errKeys = null;
 
@@ -156,23 +157,27 @@ public class DmlStatementsProcessor {
             UpdateResult r;
 
             try {
-                r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters,
-                    cancel, errKeys);
+                r = executeUpdateStatement(cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
             }
             finally {
                 cctx.operationContextPerCall(opCtx);
             }
 
-            if (F.isEmpty(r.errKeys))
-                return r.cnt + items;
-            else {
-                items += r.cnt;
-                errKeys = r.errKeys;
-            }
+            items += r.cnt;
+            errKeys = r.errKeys;
+
+            if (F.isEmpty(errKeys))
+                break;
         }
 
-        throw new IgniteSQLException("Failed to update or delete some keys: " + Arrays.deepToString(errKeys),
-            IgniteQueryErrorCode.CONCURRENT_UPDATE);
+        if (F.isEmpty(errKeys)) {
+            if (items == 1L)
+                return UpdateResult.ONE;
+            else if (items == 0L)
+                return UpdateResult.ZERO;
+        }
+
+        return new UpdateResult(items, errKeys);
     }
 
     /**
@@ -186,9 +191,14 @@ public class DmlStatementsProcessor {
     @SuppressWarnings("unchecked")
     QueryCursorImpl<List<?>> updateSqlFieldsTwoStep(String spaceName, PreparedStatement stmt,
         SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
-        long res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+        UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+
+        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+            (Collections.singletonList(res.cnt)), null, false);
 
-        return cursorForUpdateResult(res);
+        resCur.fieldsMeta(UPDATE_RESULT_META);
+
+        return resCur;
     }
 
     /**
@@ -203,10 +213,95 @@ public class DmlStatementsProcessor {
     @SuppressWarnings("unchecked")
     GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement stmt,
         SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
-        long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
+        UpdateResult res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
 
         return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
-            new IgniteSingletonIterator(Collections.singletonList(res)));
+            new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
+    }
+
+    /**
+     * Perform given statement against given data streamer. Only rows based INSERT and MERGE are supported
+     * as well as key bound UPDATE and DELETE (ones with filter {@code WHERE _key = ?}).
+     *
+     * @param streamer Streamer to feed data to.
+     * @param stmt Statement.
+     * @param args Statement arguments.
+     * @return Number of rows in given statement for INSERT and MERGE, {@code 1} otherwise.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args)
+        throws IgniteCheckedException {
+        args = U.firstNotNull(args, X.EMPTY_OBJECT_ARRAY);
+
+        Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
+
+        assert p != null;
+
+        UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null);
+
+        if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().namex()))
+            throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
+                " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) {
+            assert plan.isLocSubqry;
+
+            final GridCacheContext cctx = plan.tbl.rowDescriptor().context();
+
+            QueryCursorImpl<List<?>> cur;
+
+            final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum);
+
+            final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry,
+                F.asList(args), null, false, 0, null);
+
+            QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, null);
+
+            data.addAll(stepCur.getAll());
+
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    return data.iterator();
+                }
+            }, null);
+
+            GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+            if (plan.rowsNum == 1) {
+                IgniteBiTuple t = rowToKeyValue(cctx, cur.iterator().next().toArray(), plan.colNames, plan.colTypes,
+                    plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc);
+
+                streamer.addData(t.getKey(), t.getValue());
+
+                return 1;
+            }
+
+            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowsNum);
+
+            for (List<?> row : cur) {
+                final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.colTypes,
+                    plan.keySupplier, plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc);
+
+                rows.put(t.getKey(), t.getValue());
+            }
+
+            streamer.addData(rows);
+
+            return rows.size();
+        }
+        else
+            throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
     }
 
     /**
@@ -214,31 +309,21 @@ public class DmlStatementsProcessor {
      * @param cctx Cache context.
      * @param prepStmt Prepared statement for DML query.
      * @param filters Space name and key filter.
-     * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
-     * @return Pair [number of successfully processed items; keys that have failed to be processed]
+     * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.   @return Pair [number of successfully processed items; keys that have failed to be processed]
      * @throws IgniteCheckedException if failed.
      */
-    @SuppressWarnings("ConstantConditions")
+    @SuppressWarnings({"ConstantConditions", "unchecked"})
     private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
-        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys)
-        throws IgniteCheckedException {
+        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel,
+        Object[] failedKeys) throws IgniteCheckedException {
         Integer errKeysPos = null;
 
-        Object[] params = fieldsQry.getArgs();
-
-        if (!F.isEmpty(failedKeys)) {
-            int paramsCnt = F.isEmpty(params) ? 0 : params.length;
-            params = Arrays.copyOf(U.firstNotNull(params, X.EMPTY_OBJECT_ARRAY), paramsCnt + 1);
-            params[paramsCnt] = failedKeys;
-            errKeysPos = paramsCnt; // Last position
-        }
-
         UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
 
         if (plan.fastUpdateArgs != null) {
             assert F.isEmpty(failedKeys) && errKeysPos == null;
 
-            return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY);
+            return doFastUpdate(plan, fieldsQry.getArgs());
         }
 
         assert !F.isEmpty(plan.selectQry);
@@ -249,7 +334,7 @@ public class DmlStatementsProcessor {
         // subquery and not some dummy stuff like "select 1, 2, 3;"
         if (!loc && !plan.isLocSubqry) {
             SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
-                .setArgs(params)
+                .setArgs(fieldsQry.getArgs())
                 .setDistributedJoins(fieldsQry.isDistributedJoins())
                 .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
                 .setLocal(fieldsQry.isLocal())
@@ -259,8 +344,8 @@ public class DmlStatementsProcessor {
             cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
         }
         else {
-            final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
-                filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
+            final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry,
+                F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
                 @Override public Iterator<List<?>> iterator() {
@@ -272,8 +357,6 @@ public class DmlStatementsProcessor {
                     }
                 }
             }, cancel);
-
-            cur.fieldsMeta(res.metaData());
         }
 
         int pageSize = loc ? 0 : fieldsQry.getPageSize();
@@ -337,38 +420,41 @@ public class DmlStatementsProcessor {
 
     /**
      * Perform single cache operation based on given args.
-     * @param params Query parameters.
+     * @param args Query parameters.
      * @return 1 if an item was affected, 0 otherwise.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws IgniteCheckedException {
+    private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException {
         GridCacheContext cctx = plan.tbl.rowDescriptor().context();
 
         FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
 
         assert singleUpdate != null;
 
-        int res;
+        boolean valBounded = (singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT);
 
-        Object key = singleUpdate.key.apply(params);
-        Object val = singleUpdate.val.apply(params);
-        Object newVal = singleUpdate.newVal.apply(params);
+        if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) { // Single item UPDATE
+            Object key = singleUpdate.key.apply(args);
+            Object newVal = singleUpdate.newVal.apply(args);
 
-        if (newVal != null) { // Single item UPDATE
-            if (val == null) // No _val bound in source query
-                res = cctx.cache().replace(key, newVal) ? 1 : 0;
+            if (valBounded) {
+                Object val = singleUpdate.val.apply(args);
+
+                return (cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
+            }
             else
-                res = cctx.cache().replace(key, val, newVal) ? 1 : 0;
+                return (cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
         }
         else { // Single item DELETE
-            if (val == null) // No _val bound in source query
-                res = cctx.cache().remove(key) ? 1 : 0;
+            Object key = singleUpdate.key.apply(args);
+            Object val = singleUpdate.val.apply(args);
+
+            if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) // No _val bound in source query
+                return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO;
             else
-                res = cctx.cache().remove(key, val) ? 1 : 0;
+                return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO;
         }
-
-        return res;
     }
 
     /**
@@ -379,7 +465,7 @@ public class DmlStatementsProcessor {
      * @return Results of DELETE (number of items affected AND keys that failed to be updated).
      */
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
-    private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl<List<?>> cursor, int pageSize)
+    private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
         // With DELETE, we have only two columns - key and value.
         long res = 0;
@@ -449,7 +535,7 @@ public class DmlStatementsProcessor {
      *     had been modified concurrently (arguments for a re-run)].
      */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize)
+    private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
         GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
 
@@ -575,7 +661,6 @@ public class DmlStatementsProcessor {
             throw new IgniteSQLException(resEx);
         }
 
-
         return new UpdateResult(res, failedKeys.toArray());
     }
 
@@ -689,7 +774,7 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    private long doMerge(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+    private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
         GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
 
         GridCacheContext cctx = desc.context();
@@ -735,7 +820,7 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private long doInsert(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+    private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
         GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
 
         GridCacheContext cctx = desc.context();
@@ -999,24 +1084,14 @@ public class DmlStatementsProcessor {
         }
     }
 
-    /**
-     * Wrap result of DML operation (number of items affected) to Iterable suitable to be wrapped by cursor.
-     *
-     * @param itemsCnt Update result to wrap.
-     * @return Resulting Iterable.
-     */
-    @SuppressWarnings("unchecked")
-    private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
-        QueryCursorImpl<List<?>> res =
-            new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
-
-        res.fieldsMeta(UPDATE_RESULT_META);
-
-        return res;
-    }
-
     /** Update result - modifications count and keys to re-run query with, if needed. */
     private final static class UpdateResult {
+        /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
+        final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
+
+        /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
+        final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+
         /** Number of processed items. */
         final long cnt;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0130b097/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..8088f80 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.cache.Cache;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheMemoryMode;
@@ -82,7 +83,6 @@ import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
@@ -136,6 +137,7 @@ import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.CommandInterface;
 import org.h2.command.Prepared;
+import org.h2.command.dml.Insert;
 import org.h2.engine.Session;
 import org.h2.engine.SysProperties;
 import org.h2.index.Index;
@@ -440,7 +442,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public PreparedStatement prepareNativeStatement(String schema, String sql) throws SQLException {
-        return prepareStatement(connectionForSpace(schema), sql, false);
+        return prepareStatement(connectionForSpace(space(schema)), sql, true);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public IgniteDataStreamer<?, ?> createStreamer(String spaceName, PreparedStatement nativeStmt,
+        long autoFlushFreq, int nodeBufSize, int nodeParOps, boolean allowOverwrite) {
+        Prepared prep = GridSqlQueryParser.prepared((JdbcPreparedStatement) nativeStmt);
+
+        if (!(prep instanceof Insert))
+            throw new IgniteSQLException("Only INSERT operations are supported in streaming mode",
+                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+        IgniteDataStreamer streamer = ctx.grid().dataStreamer(spaceName);
+
+        streamer.autoFlushFrequency(autoFlushFreq);
+
+        streamer.allowOverwrite(allowOverwrite);
+
+        if (nodeBufSize > 0)
+            streamer.perNodeBufferSize(nodeBufSize);
+
+        if (nodeParOps > 0)
+            streamer.perNodeParallelOperations(nodeParOps);
+
+        return streamer;
     }
 
     /**
@@ -873,6 +900,23 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         };
     }
 
+    /** {@inheritDoc} */
+    @Override public long streamUpdateQuery(@Nullable String spaceName, String qry,
+        @Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
+        final Connection conn = connectionForSpace(spaceName);
+
+        final PreparedStatement stmt;
+
+        try {
+            stmt = prepareStatement(conn, qry, true);
+        }
+        catch (SQLException e) {
+            throw new IgniteSQLException(e);
+        }
+
+        return dmlProc.streamUpdateQuery(streamer, stmt, params);
+    }
+
     /**
      * @param rsMeta Metadata.
      * @return List of fields metadata.
@@ -1680,13 +1724,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
     }
 
-    /**
-     * Gets space name from database schema.
-     *
-     * @param schemaName Schema name. Could not be null. Could be empty.
-     * @return Space name. Could be null.
-     */
-    public String space(String schemaName) {
+    /** {@inheritDoc} */
+    @Override public String space(String schemaName) {
         assert schemaName != null;
 
         Schema schema = schemas.get(schemaName);


Mime
View raw message