ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [10/43] ignite git commit: IGNITE-6022: JDBC: optimized batching handling. This closes #3298.
Date Wed, 17 Jan 2018 12:23:08 GMT
IGNITE-6022: JDBC: optimized batching handling. This closes #3298.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a4f22ed
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a4f22ed
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a4f22ed

Branch: refs/heads/ignite-7274
Commit: 0a4f22edbfb7ad36b630e0a20a474d4c4c9b3826
Parents: 536009b
Author: rkondakov <rkondakov@gridgain.com>
Authored: Wed Jan 17 10:44:55 2018 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Jan 17 10:44:55 2018 +0300

----------------------------------------------------------------------
 .../ignite/jdbc/thin/JdbcThinBatchSelfTest.java | 509 ++++++++++++++++++-
 .../jdbc/thin/JdbcThinErrorsSelfTest.java       |   7 +-
 .../cache/query/SqlFieldsQueryEx.java           |  45 ++
 .../odbc/jdbc/JdbcRequestHandler.java           | 102 +++-
 .../processors/query/IgniteSQLException.java    |   9 +
 .../query/h2/DmlStatementsProcessor.java        | 329 ++++++++++--
 .../processors/query/h2/IgniteH2Indexing.java   |   7 +-
 .../processors/query/h2/dml/DmlBatchSender.java | 247 +++++++--
 .../processors/query/h2/dml/DmlUtils.java       |  12 +
 .../processors/query/h2/dml/UpdatePlan.java     |  61 ++-
 .../query/h2/dml/UpdatePlanBuilder.java         |   2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 12 files changed, 1219 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
index 8609615..fe7c170 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBatchSelfTest.java
@@ -21,9 +21,14 @@ import java.sql.BatchUpdateException;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.concurrent.Callable;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.testframework.GridTestUtils;
 
+import static org.junit.Assert.assertArrayEquals;
+
 /**
  * Statement test.
  */
@@ -151,7 +156,9 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
     public void testBatchException() throws SQLException {
         final int BATCH_SIZE = 7;
 
-        for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) {
+        final int FAILED_IDX = 5;
+
+        for (int idx = 0, i = 0; i < FAILED_IDX; ++i, idx += i) {
             stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
                 + generateValues(idx, i + 1));
         }
@@ -159,7 +166,7 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
         stmt.addBatch("select * from Person");
 
         stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
-            + generateValues(100, 1));
+            + generateValues(100, 7));
 
         try {
             stmt.executeBatch();
@@ -171,13 +178,212 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
             assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
 
             for (int i = 0; i < BATCH_SIZE; ++i)
-                assertEquals("Invalid update count",i + 1, updCnts[i]);
+                assertEquals("Invalid update count", i != FAILED_IDX ? i + 1 : Statement.EXECUTE_FAILED,
+                    updCnts[i]);
 
             if (!e.getMessage().contains("Given statement type does not match that declared by JDBC driver")) {
                 log.error("Invalid exception: ", e);
 
                 fail();
             }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.PARSING_EXCEPTION, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.STMT_TYPE_MISMATCH, e.getErrorCode());
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchParseException() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        for (int idx = 0, i = 0; i < FAILED_IDX; ++i, idx += i) {
+            stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+                + generateValues(idx, i + 1));
+        }
+
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values (4444, 'fail', 1, 1, 1)");
+
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+            + generateValues(100, 7));
+
+        try {
+            stmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown");
+        } catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count: " + i, i != FAILED_IDX ? i + 1 : Statement.EXECUTE_FAILED,
+                    updCnts[i]);
+
+            if (!e.getMessage().contains("Value conversion failed")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchMerge() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        for (int idx = 0, i = 0; i < BATCH_SIZE; ++i, idx += i) {
+            stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values "
+                + generateValues(idx, i + 1));
+        }
+
+        int [] updCnts = stmt.executeBatch();
+
+        assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+        for (int i = 0; i < BATCH_SIZE; ++i)
+            assertEquals("Invalid update count",i + 1, updCnts[i]);
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchMergeParseException() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        for (int idx = 0, i = 0; i < FAILED_IDX; ++i, idx += i) {
+            stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values "
+                + generateValues(idx, i + 1));
+        }
+
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values (4444, 'FAIL', 1, 1, 1)");
+
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values "
+            + generateValues(100, 7));
+
+        try {
+            stmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown");
+        } catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count: " + i, i != FAILED_IDX ? i + 1 : Statement.EXECUTE_FAILED,
+                    updCnts[i]);
+
+            if (!e.getMessage().contains("Value conversion failed")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
+        }
+    }
+
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchKeyDuplicatesException() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        int idx = 0;
+
+        for (int i = 0; i < FAILED_IDX; ++i, idx += i) {
+            stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+                + generateValues(idx, i + 1));
+        }
+
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values ('p0', 0, 'Name0', 'Lastname0', 20)");
+
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+            + generateValues(++idx, 7));
+
+        try {
+            stmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown");
+        } catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count: " + i, i != FAILED_IDX ? i + 1 : Statement.EXECUTE_FAILED,
+                    updCnts[i]);
+
+            if (!e.getMessage().contains("Failed to INSERT some keys because they are already in cache [keys=[p0]")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.CONSTRAINT_VIOLATION, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.DUPLICATE_KEY, e.getErrorCode());
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testHeterogeneousBatch() throws SQLException {
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values ('p0', 0, 'Name0', 'Lastname0', 10)");
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values ('p1', 1, 'Name1', 'Lastname1', 20), ('p2', 2, 'Name2', 'Lastname2', 30)");
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values ('p3', 3, 'Name3', 'Lastname3', 40)");
+        stmt.addBatch("update Person set id = 5 where age >= 30");
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values ('p0', 2, 'Name2', 'Lastname2', 50)");
+        stmt.addBatch("delete from Person where age <= 40");
+
+        int [] updCnts = stmt.executeBatch();
+
+        assertEquals("Invalid update counts size", 6, updCnts.length);
+        assertArrayEquals("Invalid update count", new int[] {1, 2, 1, 2, 1, 3}, updCnts);
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testHeterogeneousBatchException() throws SQLException {
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values ('p0', 0, 'Name0', 'Lastname0', 10)");
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values ('p1', 1, 'Name1', 'Lastname1', 20), ('p2', 2, 'Name2', 'Lastname2', 30)");
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values ('p3', 3, 'Name3', 'Lastname3', 40)");
+        stmt.addBatch("update Person set id = 'FAIL' where age >= 30"); // Fail.
+        stmt.addBatch("merge into Person (_key, id, firstName, lastName, age) values ('p0', 2, 'Name2', 'Lastname2', 50)");
+        stmt.addBatch("delete from Person where FAIL <= 40"); // Fail.
+
+        try {
+            stmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown");
+        } catch(BatchUpdateException e) {
+            int[] updCnts = e.getUpdateCounts();
+
+            if (!e.getMessage().contains("Value conversion failed")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid update counts size", 6, updCnts.length);
+            assertArrayEquals("Invalid update count",
+                new int[] {1, 2, 1, Statement.EXECUTE_FAILED, 1, Statement.EXECUTE_FAILED}, updCnts);
         }
     }
 
@@ -235,7 +441,11 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
     public void testBatchExceptionPrepared() throws SQLException {
         final int BATCH_SIZE = 7;
 
-        for (int i = 0; i < BATCH_SIZE; ++i) {
+        final int FAILED_IDX = 5;
+
+        assert FAILED_IDX + 2 == BATCH_SIZE;
+
+        for (int i = 0; i < FAILED_IDX; ++i) {
             int paramCnt = 1;
 
             pstmt.setString(paramCnt++, "p" + i);
@@ -248,11 +458,20 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
         }
 
         int paramCnt = 1;
-        pstmt.setString(paramCnt++, "p" + 100);
-        pstmt.setString(paramCnt++, "x");
-        pstmt.setString(paramCnt++, "Name" + 100);
-        pstmt.setString(paramCnt++, "Lastname" + 100);
-        pstmt.setInt(paramCnt++, 20 + 100);
+        pstmt.setString(paramCnt++, "p" + FAILED_IDX);
+        pstmt.setString(paramCnt++, "FAIL");
+        pstmt.setString(paramCnt++, "Name" + FAILED_IDX);
+        pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX);
+        pstmt.setInt(paramCnt++, 20 + FAILED_IDX);
+
+        pstmt.addBatch();
+
+        paramCnt = 1;
+        pstmt.setString(paramCnt++, "p" + FAILED_IDX + 1);
+        pstmt.setInt(paramCnt++, FAILED_IDX + 1);
+        pstmt.setString(paramCnt++, "Name" + FAILED_IDX + 1);
+        pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX + 1);
+        pstmt.setInt(paramCnt++, 20 + FAILED_IDX + 1);
 
         pstmt.addBatch();
 
@@ -267,13 +486,283 @@ public class JdbcThinBatchSelfTest extends JdbcThinAbstractDmlStatementSelfTest
             assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
 
             for (int i = 0; i < BATCH_SIZE; ++i)
-                assertEquals("Invalid update count",1, updCnts[i]);
+                assertEquals("Invalid update count", i != FAILED_IDX ? 1 : Statement.EXECUTE_FAILED, updCnts[i]);
 
             if (!e.getMessage().contains("Value conversion failed")) {
                 log.error("Invalid exception: ", e);
 
                 fail();
             }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchMergePrepared() throws SQLException {
+        final int BATCH_SIZE = 10;
+
+        pstmt = conn.prepareStatement("merge into Person(_key, id, firstName, lastName, age) values " +
+            "(?, ?, ?, ?, ?)");
+
+        for (int i = 0; i < BATCH_SIZE; ++i) {
+            int paramCnt = 1;
+
+            pstmt.setString(paramCnt++, "p" + i);
+            pstmt.setInt(paramCnt++, i);
+            pstmt.setString(paramCnt++, "Name" + i);
+            pstmt.setString(paramCnt++, "Lastname" + i);
+            pstmt.setInt(paramCnt++, 20 + i);
+
+            pstmt.addBatch();
+        }
+
+        int [] updCnts = pstmt.executeBatch();
+
+        assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+        for (int i = 0; i < BATCH_SIZE; ++i)
+            assertEquals("Invalid update count",1, updCnts[i]);
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchMergeExceptionPrepared() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        pstmt = conn.prepareStatement("merge into Person(_key, id, firstName, lastName, age) values " +
+            "(?, ?, ?, ?, ?)");
+
+        assert FAILED_IDX + 2 == BATCH_SIZE;
+
+        for (int i = 0; i < FAILED_IDX; ++i) {
+            int paramCnt = 1;
+
+            pstmt.setString(paramCnt++, "p" + i);
+            pstmt.setInt(paramCnt++, i);
+            pstmt.setString(paramCnt++, "Name" + i);
+            pstmt.setString(paramCnt++, "Lastname" + i);
+            pstmt.setInt(paramCnt++, 20 + i);
+
+            pstmt.addBatch();
+        }
+
+        int paramCnt = 1;
+        pstmt.setString(paramCnt++, "p" + FAILED_IDX);
+        pstmt.setString(paramCnt++, "FAIL");
+        pstmt.setString(paramCnt++, "Name" + FAILED_IDX);
+        pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX);
+        pstmt.setInt(paramCnt++, 20 + FAILED_IDX);
+
+        pstmt.addBatch();
+
+        paramCnt = 1;
+        pstmt.setString(paramCnt++, "p" + FAILED_IDX + 1);
+        pstmt.setInt(paramCnt++, FAILED_IDX + 1);
+        pstmt.setString(paramCnt++, "Name" + FAILED_IDX + 1);
+        pstmt.setString(paramCnt++, "Lastname" + FAILED_IDX + 1);
+        pstmt.setInt(paramCnt++, 20 + FAILED_IDX + 1);
+
+        pstmt.addBatch();
+
+        try {
+            int[] res = pstmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown res=" + Arrays.toString(res));
+        }
+        catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count", i != FAILED_IDX ? 1 : Statement.EXECUTE_FAILED, updCnts[i]);
+
+            if (!e.getMessage().contains("Value conversion failed")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
+        }
+    }
+
+    /**
+     * Populates table 'Person' with entities.
+     *
+     * @param size Number of entities.
+     * @throws SQLException If failed.
+     */
+    private void populateTable(int size) throws SQLException {
+        stmt.addBatch("insert into Person (_key, id, firstName, lastName, age) values "
+            + generateValues(0, size));
+
+        stmt.executeBatch();
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchUpdatePrepared() throws SQLException {
+        final int BATCH_SIZE = 10;
+
+        populateTable(BATCH_SIZE);
+
+        pstmt = conn.prepareStatement("update Person set age = 100 where id = ?;");
+
+        for (int i = 0; i < BATCH_SIZE; ++i) {
+            pstmt.setInt(1, i);
+
+            pstmt.addBatch();
+        }
+
+        int [] updCnts = pstmt.executeBatch();
+
+        assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+        for (int i = 0; i < BATCH_SIZE; ++i)
+            assertEquals("Invalid update count",1, updCnts[i]);
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchUpdateExceptionPrepared() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        populateTable(BATCH_SIZE);
+
+        pstmt = conn.prepareStatement("update Person set age = 100 where id = ?;");
+
+        assert FAILED_IDX + 2 == BATCH_SIZE;
+
+        for (int i = 0; i < FAILED_IDX; ++i) {
+            pstmt.setInt(1, i);
+
+            pstmt.addBatch();
+        }
+
+        pstmt.setString(1, "FAIL");
+
+        pstmt.addBatch();
+
+        pstmt.setInt(1, FAILED_IDX + 1);
+
+        pstmt.addBatch();
+
+        try {
+            int[] res = pstmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown res=" + Arrays.toString(res));
+        }
+        catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count", i != FAILED_IDX ? 1 : Statement.EXECUTE_FAILED, updCnts[i]);
+
+            if (!e.getMessage().contains("Data conversion error converting \"FAIL\"")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.INTERNAL_ERROR, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.UNKNOWN, e.getErrorCode());
+
+            //assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            //assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
+        }
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchDeletePrepared() throws SQLException {
+        final int BATCH_SIZE = 10;
+
+        populateTable(BATCH_SIZE);
+
+        pstmt = conn.prepareStatement("delete from Person where id = ?;");
+
+        for (int i = 0; i < BATCH_SIZE; ++i) {
+            pstmt.setInt(1, i);
+
+            pstmt.addBatch();
+        }
+
+        int [] updCnts = pstmt.executeBatch();
+
+        assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+        for (int i = 0; i < BATCH_SIZE; ++i)
+            assertEquals("Invalid update count",1, updCnts[i]);
+    }
+
+    /**
+     * @throws SQLException If failed.
+     */
+    public void testBatchDeleteExceptionPrepared() throws SQLException {
+        final int BATCH_SIZE = 7;
+
+        final int FAILED_IDX = 5;
+
+        populateTable(BATCH_SIZE);
+
+        pstmt = conn.prepareStatement("delete from Person where id = ?;");
+
+        assert FAILED_IDX + 2 == BATCH_SIZE;
+
+        for (int i = 0; i < FAILED_IDX; ++i) {
+            pstmt.setInt(1, i);
+
+            pstmt.addBatch();
+        }
+
+        pstmt.setString(1, "FAIL");
+
+        pstmt.addBatch();
+
+        pstmt.setInt(1, FAILED_IDX + 1);
+
+        pstmt.addBatch();
+
+        try {
+            int[] res = pstmt.executeBatch();
+
+            fail("BatchUpdateException must be thrown res=" + Arrays.toString(res));
+        }
+        catch(BatchUpdateException e) {
+            int [] updCnts = e.getUpdateCounts();
+
+            assertEquals("Invalid update counts size", BATCH_SIZE, updCnts.length);
+
+            for (int i = 0; i < BATCH_SIZE; ++i)
+                assertEquals("Invalid update count", i != FAILED_IDX ? 1 : Statement.EXECUTE_FAILED, updCnts[i]);
+
+            if (!e.getMessage().contains("Data conversion error converting \"FAIL\"")) {
+                log.error("Invalid exception: ", e);
+
+                fail();
+            }
+
+            assertEquals("Invalid SQL state.", SqlStateCode.INTERNAL_ERROR, e.getSQLState());
+            assertEquals("Invalid error code.", IgniteQueryErrorCode.UNKNOWN, e.getErrorCode());
+
+            //assertEquals("Invalid SQL state.", SqlStateCode.CONVERSION_FAILED, e.getSQLState());
+            //assertEquals("Invalid error code.", IgniteQueryErrorCode.CONVERSION_FAILED, e.getErrorCode());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinErrorsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinErrorsSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinErrorsSelfTest.java
index db70f3be..a286fc1 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinErrorsSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinErrorsSelfTest.java
@@ -25,6 +25,8 @@ import java.sql.Statement;
 import org.apache.ignite.jdbc.JdbcErrorsAbstractSelfTest;
 import org.apache.ignite.lang.IgniteCallable;
 
+import static org.junit.Assert.assertArrayEquals;
+
 /**
  * Test SQLSTATE codes propagation with thin client driver.
  */
@@ -96,10 +98,9 @@ public class JdbcThinErrorsSelfTest extends JdbcErrorsAbstractSelfTest {
                 fail("BatchUpdateException is expected");
             }
             catch (BatchUpdateException e) {
-                assertEquals(2, e.getUpdateCounts().length);
+                assertEquals(3, e.getUpdateCounts().length);
 
-                for (int updCnt : e.getUpdateCounts())
-                    assertEquals(1, updCnt);
+                assertArrayEquals("", new int[] {1, 1, Statement.EXECUTE_FAILED}, e.getUpdateCounts());
 
                 assertEquals("42000", e.getSQLState());
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
index c5f786e..ff10e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.query;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * {@link SqlFieldsQuery} with experimental and internal features.
@@ -33,12 +36,16 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
     /** Whether server side DML should be enabled. */
     private boolean skipReducerOnUpdate;
 
+    /** Batched arguments list. */
+    private List<Object[]> batchedArgs;
+
     /**
      * @param sql SQL query.
      * @param isQry Flag indicating whether this object denotes a query or an update operation.
      */
     public SqlFieldsQueryEx(String sql, Boolean isQry) {
         super(sql);
+
         this.isQry = isQry;
     }
 
@@ -50,6 +57,7 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
 
         this.isQry = qry.isQry;
         this.skipReducerOnUpdate = qry.skipReducerOnUpdate;
+        this.batchedArgs = qry.batchedArgs;
     }
 
     /**
@@ -155,4 +163,41 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
     @Override public SqlFieldsQuery copy() {
         return new SqlFieldsQueryEx(this);
     }
+
+    /**
+     * Adds batched arguments.
+     *
+     * @param args Batched arguments.
+     */
+    public void addBatchedArgs(Object[] args) {
+        if (this.batchedArgs == null)
+            this.batchedArgs = new ArrayList<>();
+
+        this.batchedArgs.add(args);
+    }
+
+    /**
+     * Clears batched arguments.
+     */
+    public void clearBatchedArgs() {
+        this.batchedArgs = null;
+    }
+
+    /**
+     * Returns batched arguments.
+     *
+     * @return Batched arguments.
+     */
+    public List<Object[]> batchedArguments() {
+        return this.batchedArgs;
+    }
+
+    /**
+     * Checks if query is batched.
+     *
+     * @return {@code True} if batched.
+     */
+    public boolean isBatched() {
+        return !F.isEmpty(batchedArgs);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 458c99c..d0f0073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.processors.odbc.jdbc;
 
+import java.sql.BatchUpdateException;
 import java.sql.ParameterMetaData;
+import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -35,9 +37,9 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
@@ -50,8 +52,10 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
 
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_3_0;
 import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext.VER_2_4_0;
@@ -476,19 +480,21 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
         if (F.isEmpty(schemaName))
             schemaName = QueryUtils.DFLT_SCHEMA;
 
-        int successQueries = 0;
-        int updCnts[] = new int[req.queries().size()];
+        int qryCnt = req.queries().size();
 
-        try {
-            String sql = null;
+        List<Integer> updCntsAcc = new ArrayList<>(qryCnt);
 
-            for (JdbcQuery q : req.queries()) {
-                if (q.sql() != null)
-                    sql = q.sql();
+        // Send back only the first error. Others will be written to the log.
+        IgniteBiTuple<Integer, String> firstErr = new IgniteBiTuple<>();
 
-                SqlFieldsQuery qry = new SqlFieldsQueryEx(sql, false);
+        SqlFieldsQueryEx qry = null;
 
-                qry.setArgs(q.args());
+        for (JdbcQuery q : req.queries()) {
+            if (q.sql() != null) { // If we have a new query string in the batch,
+                if (qry != null) // then execute the previous sub-batch and create a new SqlFieldsQueryEx.
+                    executeBatchedQuery(qry, updCntsAcc, firstErr);
+
+                qry = new SqlFieldsQueryEx(q.sql(), false);
 
                 qry.setDistributedJoins(distributedJoins);
                 qry.setEnforceJoinOrder(enforceJoinOrder);
@@ -497,38 +503,88 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler {
                 qry.setLazy(lazy);
 
                 qry.setSchema(schemaName);
+            }
 
-                QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query()
-                    .querySqlFieldsNoCache(qry, true, true).get(0);
+            assert qry != null;
 
-                assert !qryCur.isQuery();
+            qry.addBatchedArgs(q.args());
+        }
 
-                List<List<?>> items = qryCur.getAll();
+        if (qry != null)
+            executeBatchedQuery(qry, updCntsAcc, firstErr);
 
-                updCnts[successQueries++] = ((Long)items.get(0).get(0)).intValue();
-            }
+        int updCnts[] = U.toIntArray(updCntsAcc);
 
+        if (firstErr.isEmpty())
             return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, ClientListenerResponse.STATUS_SUCCESS, null));
+        else
+            return new JdbcResponse(new JdbcBatchExecuteResult(updCnts, firstErr.getKey(), firstErr.getValue()));
+    }
+
+    /**
+     * Executes query and updates result counters.
+     *
+     * @param qry Query.
+     * @param updCntsAcc Per query rows updates counter.
+     * @param firstErr First error data - code and message.
+     */
+    private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> updCntsAcc,
+        IgniteBiTuple<Integer, String> firstErr) {
+        try {
+            List<FieldsQueryCursor<List<?>>> qryRes = ctx.query().querySqlFieldsNoCache(qry, true, true);
+
+            for (FieldsQueryCursor<List<?>> cur : qryRes) {
+                assert !((QueryCursorImpl)cur).isQuery();
+
+                Iterator<List<?>> it = cur.iterator();
+
+                if (it.hasNext()) {
+                    int val = ((Long)it.next().get(0)).intValue();
+
+                    updCntsAcc.add(val);
+                }
+            }
         }
         catch (Exception e) {
-            U.error(log, "Failed to execute batch query [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
             int code;
 
             String msg;
 
             if (e instanceof IgniteSQLException) {
-                code = ((IgniteSQLException) e).statusCode();
+                BatchUpdateException batchCause = X.cause(e, BatchUpdateException.class);
 
-                msg = e.getMessage();
+                if (batchCause != null) {
+                    int[] updCntsOnErr = batchCause.getUpdateCounts();
+
+                    for (int i = 0; i < updCntsOnErr.length; i++)
+                        updCntsAcc.add(updCntsOnErr[i]);
+
+                    msg = batchCause.getMessage();
+
+                    code = batchCause.getErrorCode();
+                }
+                else {
+                    for (int i = 0; i < qry.batchedArguments().size(); i++)
+                        updCntsAcc.add(Statement.EXECUTE_FAILED);
+
+                    msg = e.getMessage();
+
+                    code = ((IgniteSQLException)e).statusCode();
+                }
             }
             else {
-                code = IgniteQueryErrorCode.UNKNOWN;
+                for (int i = 0; i < qry.batchedArguments().size(); i++)
+                    updCntsAcc.add(Statement.EXECUTE_FAILED);
 
                 msg = e.getMessage();
+
+                code = IgniteQueryErrorCode.UNKNOWN;
             }
 
-            return new JdbcResponse(new JdbcBatchExecuteResult(Arrays.copyOf(updCnts, successQueries), code, msg));
+            if (firstErr.isEmpty())
+                firstErr.set(code, msg);
+            else
+                U.error(log, "Failed to execute batch query [qry=" + qry +']', e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
index 2bacc23..2f74601c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IgniteSQLException.java
@@ -128,6 +128,15 @@ public class IgniteSQLException extends IgniteException {
     }
 
     /**
+     * {@link SQLException#SQLState} getter.
+     *
+     * @return {@link SQLException#SQLState}.
+     */
+    public String sqlState() {
+        return sqlState;
+    }
+
+    /**
      * @return JDBC exception containing details from this instance.
      */
     public SQLException toJdbcException() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 9a6b0af..c06fad7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -17,11 +17,14 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -42,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -50,6 +54,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
 import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
@@ -70,6 +75,7 @@ import org.h2.command.dml.Merge;
 import org.h2.command.dml.Update;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.DUPLICATE_KEY;
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
 
@@ -146,21 +152,7 @@ public class DmlStatementsProcessor {
         GridCacheContext<?, ?> cctx = plan.cacheContext();
 
         for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
-            CacheOperationContext opCtx = cctx.operationContextPerCall();
-
-            // Force keepBinary for operation context to avoid binary deserialization inside entry processor
-            if (cctx.binaryMarshaller()) {
-                CacheOperationContext newOpCtx = null;
-
-                if (opCtx == null)
-                    // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
-                    newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false);
-                else if (!opCtx.isKeepBinary())
-                    newOpCtx = opCtx.keepBinary();
-
-                if (newOpCtx != null)
-                    cctx.operationContextPerCall(newOpCtx);
-            }
+            CacheOperationContext opCtx = setKeepBinaryContext(cctx);
 
             UpdateResult r;
 
@@ -189,6 +181,125 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
+     *
+     * @param schemaName Schema.
+     * @param conn Connection.
+     * @param prepared Prepared statement.
+     * @param fieldsQry Original query.
+     * @param loc Query locality flag.
+     * @param filters Cache name and key filter.
+     * @param cancel Cancel.
+     * @return Update result (modified items count and failed keys).
+     * @throws IgniteCheckedException if failed.
+     */
+    private Collection<UpdateResult> updateSqlFieldsBatched(String schemaName, Connection conn, Prepared prepared,
+        SqlFieldsQueryEx fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
+        throws IgniteCheckedException {
+        List<Object[]> argss = fieldsQry.batchedArguments();
+
+        UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
+
+        if (plan.hasRows() && plan.mode() == UpdateMode.INSERT) {
+            GridCacheContext<?, ?> cctx = plan.cacheContext();
+
+            CacheOperationContext opCtx = setKeepBinaryContext(cctx);
+
+            try {
+                List<List<List<?>>> cur = plan.createRows(argss);
+
+                List<UpdateResult> res = processDmlSelectResultBatched(plan, cur, fieldsQry.getPageSize());
+
+                return res;
+            }
+            finally {
+                cctx.operationContextPerCall(opCtx);
+            }
+        }
+        else {
+            // Fallback to previous mode.
+            Collection<UpdateResult> ress = new ArrayList<>(argss.size());
+
+            SQLException batchException = null;
+
+            int[] cntPerRow = new int[argss.size()];
+
+            int cntr = 0;
+
+            for (Object[] args : argss) {
+                SqlFieldsQueryEx qry0 = (SqlFieldsQueryEx)fieldsQry.copy();
+
+                qry0.clearBatchedArgs();
+                qry0.setArgs(args);
+
+                UpdateResult res;
+
+                try {
+                    res = updateSqlFields(schemaName, conn, prepared, qry0, loc, filters, cancel);
+
+                    cntPerRow[cntr++] = (int)res.counter();
+
+                    ress.add(res);
+                }
+                catch (Exception e ) {
+                    String sqlState;
+
+                    int code;
+
+                    if (e instanceof IgniteSQLException) {
+                        sqlState = ((IgniteSQLException)e).sqlState();
+
+                        code = ((IgniteSQLException)e).statusCode();
+                    } else {
+                        sqlState = SqlStateCode.INTERNAL_ERROR;
+
+                        code = IgniteQueryErrorCode.UNKNOWN;
+                    }
+
+                    batchException = chainException(batchException, new SQLException(e.getMessage(), sqlState, code, e));
+
+                    cntPerRow[cntr++] = Statement.EXECUTE_FAILED;
+                }
+            }
+
+            if (batchException != null) {
+                BatchUpdateException e = new BatchUpdateException(batchException.getMessage(),
+                    batchException.getSQLState(), batchException.getErrorCode(), cntPerRow, batchException);
+
+                throw new IgniteCheckedException(e);
+            }
+
+            return ress;
+        }
+    }
+
+    /**
+     * Makes current operation context as keepBinary.
+     *
+     * @param cctx Cache context.
+     * @return Old operation context.
+     */
+    private CacheOperationContext setKeepBinaryContext(GridCacheContext<?, ?> cctx) {
+        CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+        if (cctx.binaryMarshaller()) {
+            CacheOperationContext newOpCtx = null;
+
+            if (opCtx == null)
+                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false);
+            else if (!opCtx.isKeepBinary())
+                newOpCtx = opCtx.keepBinary();
+
+            if (newOpCtx != null)
+                cctx.operationContextPerCall(newOpCtx);
+        }
+
+        return opCtx;
+    }
+
+    /**
      * @param schemaName Schema.
      * @param c Connection.
      * @param p Prepared statement.
@@ -198,18 +309,43 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
+    List<QueryCursorImpl<List<?>>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
         SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
+        if (DmlUtils.isBatched(fieldsQry)) {
+            Collection<UpdateResult> ress = updateSqlFieldsBatched(schemaName, c, p, (SqlFieldsQueryEx)fieldsQry,
+                false, null, cancel);
+
+            ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(ress.size());
+
+            for (UpdateResult res : ress) {
+                checkUpdateResult(res);
+
+                QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                    (Collections.singletonList(res.counter())), cancel, false);
+
+                resCur.fieldsMeta(UPDATE_RESULT_META);
+
+                resCurs.add(resCur);
+            }
+
+            return resCurs;
+        }
+        else {
+            UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
 
-        checkUpdateResult(res);
+            ArrayList<QueryCursorImpl<List<?>>> resCurs = new ArrayList<>(1);
 
-        QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-            (Collections.singletonList(res.counter())), cancel, false);
+            checkUpdateResult(res);
 
-        resCur.fieldsMeta(UPDATE_RESULT_META);
+            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
+                (Collections.singletonList(res.counter())), cancel, false);
 
-        return resCur;
+            resCur.fieldsMeta(UPDATE_RESULT_META);
+
+            resCurs.add(resCur);
+
+            return resCurs;
+        }
     }
 
     /**
@@ -400,6 +536,30 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * Performs the planned update.
+     * @param plan Update plan.
+     * @param rows Rows to update.
+     * @param pageSize Page size.
+     * @return {@link List} of update results.
+     * @throws IgniteCheckedException If failed.
+     */
+    private List<UpdateResult> processDmlSelectResultBatched(UpdatePlan plan, List<List<List<?>>> rows, int pageSize)
+        throws IgniteCheckedException {
+        switch (plan.mode()) {
+            case MERGE:
+                // TODO
+                throw new IgniteCheckedException("Unsupported, fix");
+
+            case INSERT:
+                return doInsertBatched(plan, rows, pageSize);
+
+            default:
+                throw new IgniteSQLException("Unexpected batched DML operation [mode=" + plan.mode() + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+        }
+    }
+
+    /**
      * @param cctx Cache context.
      * @param plan Update plan.
      * @param cursor Cursor over select results.
@@ -489,7 +649,7 @@ public class DmlStatementsProcessor {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
-        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
 
         for (List<?> row : cursor) {
             if (row.size() != 2) {
@@ -498,7 +658,7 @@ public class DmlStatementsProcessor {
                 continue;
             }
 
-            sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV));
+            sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV),  0);
         }
 
         sender.flush();
@@ -537,7 +697,7 @@ public class DmlStatementsProcessor {
         throws IgniteCheckedException {
         GridCacheContext cctx = plan.cacheContext();
 
-        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
 
         for (List<?> row : cursor) {
             T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
@@ -546,7 +706,7 @@ public class DmlStatementsProcessor {
             Object oldVal = row0.get2();
             Object newVal = row0.get3();
 
-            sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal)));
+            sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal)), 0);
         }
 
         sender.flush();
@@ -637,16 +797,16 @@ public class DmlStatementsProcessor {
                 return 1;
             else
                 throw new IgniteSQLException("Duplicate key during INSERT [key=" + t.getKey() + ']',
-                    IgniteQueryErrorCode.DUPLICATE_KEY);
+                    DUPLICATE_KEY);
         }
         else {
             // Keys that failed to INSERT due to duplication.
-            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize, 1);
 
             for (List<?> row : cursor) {
                 final IgniteBiTuple keyValPair = plan.processRow(row);
 
-                sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()));
+                sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()),  0);
             }
 
             sender.flush();
@@ -673,6 +833,117 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * Execute INSERT statement plan.
+     *
+     * @param plan Plan to execute.
+     * @param cursor Cursor to take inserted data from. I.e. list of batch arguments for each query.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+     */
+    private List<UpdateResult> doInsertBatched(UpdatePlan plan, List<List<List<?>>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        GridCacheContext cctx = plan.cacheContext();
+
+        DmlBatchSender snd = new DmlBatchSender(cctx, pageSize, cursor.size());
+
+        int rowNum = 0;
+
+        SQLException resEx = null;
+
+        for (List<List<?>> qryRow : cursor) {
+            for (List<?> row : qryRow) {
+                try {
+                    final IgniteBiTuple keyValPair = plan.processRow(row);
+
+                    snd.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()), rowNum);
+                }
+                catch (Exception e) {
+                    String sqlState;
+
+                    int code;
+
+                    if (e instanceof IgniteSQLException) {
+                        sqlState = ((IgniteSQLException)e).sqlState();
+
+                        code = ((IgniteSQLException)e).statusCode();
+                    } else {
+                        sqlState = SqlStateCode.INTERNAL_ERROR;
+
+                        code = IgniteQueryErrorCode.UNKNOWN;
+                    }
+
+                    resEx = chainException(resEx, new SQLException(e.getMessage(), sqlState, code, e));
+
+                    snd.setFailed(rowNum);
+                }
+            }
+
+            rowNum++;
+        }
+
+        try {
+            snd.flush();
+        }
+        catch (Exception e) {
+            resEx = chainException(resEx, new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR,
+                IgniteQueryErrorCode.UNKNOWN, e));
+        }
+
+        resEx = chainException(resEx, snd.error());
+
+        if (!F.isEmpty(snd.failedKeys())) {
+            SQLException e = new SQLException("Failed to INSERT some keys because they are already in cache [keys=" +
+                snd.failedKeys() + ']', SqlStateCode.CONSTRAINT_VIOLATION, DUPLICATE_KEY);
+
+            resEx = chainException(resEx, e);
+        }
+
+        if (resEx != null) {
+            BatchUpdateException e = new BatchUpdateException(resEx.getMessage(), resEx.getSQLState(),
+                resEx.getErrorCode(), snd.perRowCounterAsArray(), resEx);
+
+            throw new IgniteCheckedException(e);
+        }
+
+        int[] cntPerRow = snd.perRowCounterAsArray();
+
+        List<UpdateResult> res = new ArrayList<>(cntPerRow.length);
+
+        for (int i = 0; i < cntPerRow.length; i++ ) {
+            int cnt = cntPerRow[i];
+
+            res.add(new UpdateResult(cnt , X.EMPTY_OBJECT_ARRAY));
+        }
+
+        return res;
+    }
+
+    /**
+     * Adds exception to the chain.
+     *
+     * @param main Exception to add another exception to.
+     * @param add Exception which should be added to chain.
+     * @return Chained exception.
+     */
+    private SQLException chainException(SQLException main, SQLException add) {
+        if (main == null) {
+            if (add != null) {
+                main = add;
+
+                return main;
+            }
+            else
+                return null;
+        }
+        else {
+            main.setNextException(add);
+
+            return main;
+        }
+    }
+
+    /**
      *
      * @param schemaName Schema name.
      * @param stmt Prepared statement.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 4393946..349ee92 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO
 import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
@@ -1028,7 +1029,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (e.getErrorCode() == ErrorCode.STATEMENT_WAS_CANCELED)
                 throw new QueryCancelledException();
 
-            throw new IgniteCheckedException("Failed to execute SQL query.", e);
+            throw new IgniteCheckedException("Failed to execute SQL query. " + e.getMessage(), e);
         }
         finally {
             if (timeoutMillis > 0)
@@ -1534,7 +1535,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     int paramsCnt = prepared.getParameters().size();
 
-                    if (paramsCnt > 0) {
+                    if (!DmlUtils.isBatched(qry) && paramsCnt > 0) {
                         if (argsOrig == null || argsOrig.length < firstArg + paramsCnt) {
                             throw new IgniteException("Invalid number of query parameters. " +
                                 "Cannot find " + (argsOrig.length + 1 - firstArg) + " parameter.");
@@ -1583,7 +1584,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 if (twoStepQry == null) {
                     if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                         try {
-                            res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
+                            res.addAll(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
                                 qry.copy().setSql(sqlQry).setArgs(args), cancel));
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
index a4a60c3..34b50b1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
@@ -17,19 +17,10 @@
 
 package org.apache.ignite.internal.processors.query.h2.dml;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.util.typedef.F;
-
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -37,6 +28,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import org.apache.ignite.internal.util.typedef.F;
 
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
 
@@ -51,7 +53,7 @@ public class DmlBatchSender {
     private final int size;
 
     /** Batches. */
-    private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>();
+    private final Map<UUID, Batch> batches = new HashMap<>();
 
     /** Result count. */
     private long updateCnt;
@@ -62,15 +64,20 @@ public class DmlBatchSender {
     /** Exception. */
     private SQLException err;
 
+    /** Per row updates counter */
+    private int[] cntPerRow;
+
     /**
      * Constructor.
      *
      * @param cctx Cache context.
      * @param size Batch.
+     * @param qryNum Number of queries.
      */
-    public DmlBatchSender(GridCacheContext cctx, int size) {
+    public DmlBatchSender(GridCacheContext cctx, int size, int qryNum) {
         this.cctx = cctx;
         this.size = size;
+        cntPerRow = new int[qryNum];
     }
 
     /**
@@ -78,39 +85,46 @@ public class DmlBatchSender {
      *
      * @param key Key.
      * @param proc Processor.
+     * @param rowNum Row number.
+     * @throws IgniteCheckedException If failed.
      */
-    public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
+    public void add(Object key, EntryProcessor<Object, Object, Boolean> proc, int rowNum)
+        throws IgniteCheckedException {
+        assert key != null;
+        assert proc != null;
+
         ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
 
         if (node == null)
             throw new IgniteCheckedException("Failed to map key to node.");
 
+        assert rowNum < cntPerRow.length;
+
         UUID nodeId = node.id();
 
-        Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId);
+        Batch batch = batches.get(nodeId);
 
         if (batch == null) {
-            batch = new HashMap<>();
+            batch = new Batch();
 
             batches.put(nodeId, batch);
         }
 
-        batch.put(key, proc);
-
-        if (batch.size() >= size) {
+        if (batch.containsKey(key)) { // Force cache update if duplicates found.
             sendBatch(batch);
-
-            batch.clear();
         }
+
+        batch.put(key, rowNum, proc);
+
+        if (batch.size() >= size)
+            sendBatch(batch);
     }
 
     /**
      * Flush any remaining entries.
-     *
-     * @throws IgniteCheckedException If failed.
      */
-    public void flush() throws IgniteCheckedException {
-        for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) {
+    public void flush() {
+        for (Batch batch : batches.values()) {
             if (!batch.isEmpty())
                 sendBatch(batch);
         }
@@ -138,15 +152,33 @@ public class DmlBatchSender {
     }
 
     /**
+     * Returns per row updates counter as array.
+     *
+     * @return Per row updates counter as array.
+     */
+    public int[] perRowCounterAsArray() {
+        return cntPerRow;
+    }
+
+    /**
+     * Sets row as failed.
+     *
+     * @param rowNum Row number.
+     */
+    public void setFailed(int rowNum) {
+        cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
+    }
+
+    /**
      * Send the batch.
      *
      * @param batch Batch.
-     * @throws IgniteCheckedException If failed.
      */
-    private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch)
-        throws IgniteCheckedException {
+    private void sendBatch(Batch batch) {
         DmlPageProcessingResult pageRes = processPage(cctx, batch);
 
+        batch.clear();
+
         updateCnt += pageRes.count();
 
         if (failedKeys == null)
@@ -156,33 +188,48 @@ public class DmlBatchSender {
 
         if (pageRes.error() != null) {
             if (err == null)
-                err = error();
+                err = pageRes.error();
             else
-                err.setNextException(error());
+                err.setNextException(pageRes.error());
         }
     }
 
     /**
      * Execute given entry processors and collect errors, if any.
      * @param cctx Cache context.
-     * @param rows Rows to process.
+     * @param batch Rows to process.
      * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
      *     updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
-     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private static DmlPageProcessingResult processPage(GridCacheContext cctx,
-        Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
-        Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows);
+    private DmlPageProcessingResult processPage(GridCacheContext cctx, Batch batch) {
+        Map<Object, EntryProcessorResult<Boolean>> res;
 
-        if (F.isEmpty(res))
-            return new DmlPageProcessingResult(rows.size(), null, null);
+        try {
+            res = cctx.cache().invokeAll(batch.rowProcessors());
+        }
+        catch (IgniteCheckedException e) {
+            for (Integer rowNum : batch.rowNumbers().values()) {
+                assert rowNum != null;
+
+                cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
+            }
+
+            return new DmlPageProcessingResult(0, null,
+                new SQLException(e.getMessage(), SqlStateCode.INTERNAL_ERROR, IgniteQueryErrorCode.UNKNOWN, e));
+        }
+
+        if (F.isEmpty(res)) {
+            countAllRows(batch.rowNumbers().values());
 
-        DmlPageProcessingErrorResult splitRes = splitErrors(res);
+            return new DmlPageProcessingResult(batch.size(), null, null);
+        }
+
+        DmlPageProcessingErrorResult splitRes = splitErrors(res, batch);
 
         int keysCnt = splitRes.errorKeys().length;
 
-        return new DmlPageProcessingResult(rows.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(),
+        return new DmlPageProcessingResult(batch.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(),
             splitRes.error());
     }
 
@@ -191,12 +238,15 @@ public class DmlBatchSender {
      * processing yielded an exception.
      *
      * @param res Result of {@link GridCacheAdapter#invokeAll)}
+     * @param batch Batch.
      * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
      * null if all keys are duplicates/concurrently modified ones).
      */
-    private static DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
+    private DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res, Batch batch) {
         Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
 
+        countAllRows(batch.rowNumbers().values());
+
         SQLException currSqlEx = null;
 
         SQLException firstSqlEx = null;
@@ -225,8 +275,125 @@ public class DmlBatchSender {
 
                 errors++;
             }
+            finally {
+                Object key = e.getKey();
+
+                Integer rowNum = batch.rowNumbers().get(key);
+
+                assert rowNum != null;
+
+                cntPerRow[rowNum] = Statement.EXECUTE_FAILED;
+            }
         }
 
         return new DmlPageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
     }
+
+    /**
+     * Updates counters as if all rowNums were successfully processed.
+     *
+     * @param rowNums Rows.
+     */
+    private void countAllRows(Collection<Integer> rowNums) {
+        for (Integer rowNum : rowNums) {
+            assert rowNum != null;
+
+            if (cntPerRow[rowNum] > -1)
+                cntPerRow[rowNum]++;
+        }
+    }
+
+    /**
+     * Batch for update.
+     */
+    private static class Batch  {
+        /** Map from keys to row numbers. */
+        private Map<Object, Integer> rowNums = new HashMap<>();
+
+        /** Map from keys to entry processors. */
+        private Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcs = new HashMap<>();
+
+        /**
+         * Checks if batch contains key.
+         *
+         * @param key Key.
+         * @return {@code True} if contains.
+         */
+        public boolean containsKey(Object key) {
+            boolean res = rowNums.containsKey(key);
+
+            assert res == rowProcs.containsKey(key);
+
+            return res;
+        }
+
+        /**
+         * Returns batch size.
+         *
+         * @return Batch size.
+         */
+        public int size() {
+            int res = rowNums.size();
+
+            assert res == rowProcs.size();
+
+            return res;
+        }
+
+        /**
+         * Adds row to batch.
+         *
+         * @param key Key.
+         * @param rowNum Row number.
+         * @param proc Entry processor.
+         * @return {@code True} if there was an entry associated with the given key.
+         */
+        public boolean put(Object key, Integer rowNum, EntryProcessor<Object, Object, Boolean> proc) {
+            Integer prevNum = rowNums.put(key, rowNum);
+            EntryProcessor prevProc = rowProcs.put(key, proc);
+
+            assert (prevNum == null) == (prevProc == null);
+
+            return prevNum != null;
+        }
+
+        /**
+         * Clears batch.
+         */
+        public void clear() {
+            assert rowNums.size() == rowProcs.size();
+
+            rowNums.clear();
+            rowProcs.clear();
+        }
+
+        /**
+         * Checks if batch is empty.
+         *
+         * @return {@code True} if empty.
+         */
+        public boolean isEmpty() {
+            assert rowNums.size() == rowProcs.size();
+
+            return rowNums.isEmpty();
+        }
+
+        /**
+         * Row numbers map getter.
+         *
+         * @return Row numbers map.
+         */
+        public Map<Object, Integer> rowNumbers() {
+            return rowNums;
+        }
+
+        /**
+         * Row processors map getter.
+         *
+         * @return Row processors map.
+         */
+        public Map<Object, EntryProcessor<Object, Object, Boolean>> rowProcessors() {
+            return rowProcs;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
index 8d4861e..03b03d8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -22,7 +22,9 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Date;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
@@ -117,6 +119,16 @@ public class DmlUtils {
     }
 
     /**
+     * Check whether query is batched.
+     *
+     * @param qry Query.
+     * @return {@code True} if batched.
+     */
+    public static boolean isBatched(SqlFieldsQuery qry) {
+        return (qry instanceof SqlFieldsQueryEx) && ((SqlFieldsQueryEx)qry).isBatched();
+    }
+
+    /**
      * Private constructor.
      */
     private DmlUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index 5625e37..17dc9d1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -372,8 +372,12 @@ public final class UpdatePlan {
 
     /**
      * Extract rows from plan without performing any query.
+     *
      * @param args Original query arguments.
-     * @return Rows from plan.
+     * @return {@link List} of rows from the plan for a single query.
+     * For example, if we have multiple args in a query: <br/>
+     * {@code INSERT INTO person VALUES (k1, v1), (k2, v2), (k3, v3);} <br/>
+     * we will get a {@link List} of {@link List} with items {@code {[k1, v1], [k2, v2], [k3, v3]}}.
      * @throws IgniteCheckedException if failed.
      */
     public List<List<?>> createRows(Object[] args) throws IgniteCheckedException {
@@ -383,6 +387,59 @@ public final class UpdatePlan {
 
         GridH2RowDescriptor desc = tbl.rowDescriptor();
 
+        extractArgsValues(args, res, desc);
+
+        return res;
+    }
+
+    /**
+     * Extract rows from plan without performing any query.
+     *
+     * @param argss Batch of arguments.
+     * @return {@link List} of rows from the plan for each query.
+     * For example, if we have a batch of queries with multiple args: <br/>
+     * <code>
+     * INSERT INTO person VALUES (k1, v1), (k2, v2), (k3, v3); <br/>
+     * INSERT INTO person VALUES (k4, v4), (k5, v5), (k6, v6);<br/>
+     * </code>
+     * we will get a {@link List} of {@link List} of {@link List} with items: <br/>
+     * <code>
+     * {[k1, v1], [k2, v2], [k3, v3]},<br/>
+     * {[k4, v4], [k5, v5], [k6, v6]}<br/>
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public List<List<List<?>>> createRows(List<Object[]> argss) throws IgniteCheckedException {
+        assert rowsNum > 0 && !F.isEmpty(colNames);
+        assert argss != null;
+
+        List<List<List<?>>> resPerQry = new ArrayList<>(argss.size());
+
+        GridH2RowDescriptor desc = tbl.rowDescriptor();
+
+        for (Object[] args : argss) {
+            List<List<?>> res = new ArrayList<>();
+
+            resPerQry.add(res);
+
+            extractArgsValues(args, res, desc);
+        }
+
+        return resPerQry;
+    }
+
+    /**
+     * Extracts values from arguments.
+     *
+     * @param args Arguments.
+     * @param res Result list where to put values to.
+     * @param desc Row descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void extractArgsValues(Object[] args, List<List<?>> res, GridH2RowDescriptor desc)
+        throws IgniteCheckedException {
+        assert res != null;
+
         for (List<DmlArgument> row : rows) {
             List<Object> resRow = new ArrayList<>();
 
@@ -400,8 +457,6 @@ public final class UpdatePlan {
 
             res.add(resRow);
         }
-
-        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 5ffd264..3305b00 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -619,7 +619,7 @@ public final class UpdatePlanBuilder {
         Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String selectQry, String cacheName)
         throws IgniteCheckedException {
 
-        if (loc || !isSkipReducerOnUpdateQuery(fieldsQry))
+        if (loc || !isSkipReducerOnUpdateQuery(fieldsQry) || DmlUtils.isBatched(fieldsQry))
             return null;
 
         assert conn != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0a4f22ed/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 29e9b27..3b72b8e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -744,7 +744,7 @@ public class GridReduceQueryExecutor {
                             if (wasCancelled(err))
                                 throw new QueryCancelledException(); // Throw correct exception.
 
-                            throw new CacheException("Failed to run map query remotely.", err);
+                            throw new CacheException("Failed to run map query remotely." + err.getMessage(), err);
                         }
 
                         if (state instanceof AffinityTopologyVersion) {


Mime
View raw message