Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6636F200D0F for ; Fri, 29 Sep 2017 13:22:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 648EC1609C5; Fri, 29 Sep 2017 11:22:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 110BC1609EE for ; Fri, 29 Sep 2017 13:22:27 +0200 (CEST) Received: (qmail 20791 invoked by uid 500); 29 Sep 2017 11:22:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 20723 invoked by uid 99); 29 Sep 2017 11:22:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Sep 2017 11:22:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01E82F5BB7; Fri, 29 Sep 2017 11:22:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 29 Sep 2017 11:22:29 -0000 Message-Id: <64e0834c036241f9a392670a7201dfea@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/10] ignite git commit: IGNITE-6046: JDBC thin driver: allowed multiple statements execution (e.g. "SELECT something; SELECT something_else". This closes #2618. archived-at: Fri, 29 Sep 2017 11:22:30 -0000 IGNITE-6046: JDBC thin driver: allowed multiple statements execution (e.g. "SELECT something; SELECT something_else". This closes #2618. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/405749a7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/405749a7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/405749a7 Branch: refs/heads/ignite-3478 Commit: 405749a736c85c054e2c4ac1dc197ea903e8d75f Parents: 507ac67 Author: tledkov-gridgain Authored: Thu Sep 28 15:59:12 2017 +0300 Committer: devozerov Committed: Thu Sep 28 15:59:12 2017 +0300 ---------------------------------------------------------------------- .../jdbc/thin/JdbcThinStatementSelfTest.java | 138 +++++++++++-- .../ignite/cache/query/SqlFieldsQuery.java | 18 ++ .../jdbc/thin/JdbcThinDatabaseMetadata.java | 2 +- .../jdbc/thin/JdbcThinPreparedStatement.java | 2 +- .../internal/jdbc/thin/JdbcThinResultSet.java | 8 +- .../internal/jdbc/thin/JdbcThinStatement.java | 196 +++++++++++++------ .../internal/jdbc/thin/JdbcThinTcpIo.java | 9 +- .../odbc/jdbc/JdbcConnectionContext.java | 10 +- ...dbcQueryExecuteMultipleStatementsResult.java | 134 +++++++++++++ .../odbc/jdbc/JdbcRequestHandler.java | 82 ++++++-- .../processors/odbc/jdbc/JdbcResult.java | 8 + .../processors/odbc/jdbc/JdbcResultInfo.java | 95 +++++++++ .../processors/query/GridQueryIndexing.java | 7 +- .../processors/query/GridQueryProcessor.java | 28 ++- ...IgniteClientCacheInitializationFailTest.java | 6 +- .../query/h2/DmlStatementsProcessor.java | 35 ++-- .../processors/query/h2/IgniteH2Indexing.java | 181 +++++++++++++---- .../query/h2/ddl/DdlStatementsProcessor.java | 11 +- .../query/h2/sql/GridSqlQueryParser.java | 81 ++++++++ .../query/h2/sql/GridSqlQuerySplitter.java | 17 +- .../MultipleStatementsSqlQuerySelfTest.java | 154 +++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 22 files changed, 1037 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java index 16f118c..5309465 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStatementSelfTest.java @@ -92,7 +92,6 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { startGridsMultiThreaded(3); - fillCache(); } @@ -414,22 +413,120 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { /** * @throws Exception If failed. */ - public void testExecuteQueryMultipleResultSets() throws Exception { - assert !conn.getMetaData().supportsMultipleResultSets(); + public void testExecuteQueryMultipleOnlyResultSets() throws Exception { + assert conn.getMetaData().supportsMultipleResultSets(); - fail("https://issues.apache.org/jira/browse/IGNITE-6046"); + int stmtCnt = 10; - final String sqlText = "select 1; select 1"; + StringBuilder sql = new StringBuilder(); - GridTestUtils.assertThrows(log, - new Callable() { - @Override public Object call() throws Exception { - return stmt.executeQuery(sqlText); - } - }, - SQLException.class, - "Multiple result sets" - ); + for (int i = 0; i < stmtCnt; ++i) + sql.append("select ").append(i).append("; "); + + assert stmt.execute(sql.toString()); + + for (int i = 0; i < stmtCnt; ++i) { + assert stmt.getMoreResults(); + + ResultSet rs = stmt.getResultSet(); + + assert rs.next(); + assert rs.getInt(1) == i; + assert !rs.next(); + } + + assert !stmt.getMoreResults(); + } + + /** + * @throws Exception If failed. + */ + public void testExecuteQueryMultipleOnlyDml() throws Exception { + conn.setSchema(null); + + int stmtCnt = 10; + + StringBuilder sql = new StringBuilder("drop table if exists test; create table test(ID int primary key, NAME varchar(20)); "); + + for (int i = 0; i < stmtCnt; ++i) + sql.append("insert into test (ID, NAME) values (" + i + ", 'name_" + i +"'); "); + + assert !stmt.execute(sql.toString()); + + // DROP TABLE statement + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 0; + + // CREATE TABLE statement + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 0; + + for (int i = 0; i < stmtCnt; ++i) { + assert stmt.getMoreResults(); + + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 1; + } + + assert !stmt.getMoreResults(); + } + + /** + * @throws Exception If failed. + */ + public void testExecuteQueryMultipleMixed() throws Exception { + conn.setSchema(null); + + int stmtCnt = 10; + + StringBuilder sql = new StringBuilder("drop table if exists test; create table test(ID int primary key, NAME varchar(20)); "); + + for (int i = 0; i < stmtCnt; ++i) { + if (i % 2 == 0) + sql.append(" insert into test (ID, NAME) values (" + i + ", 'name_" + i + "'); "); + else + sql.append(" select * from test where id < " + i + "; "); + } + + assert !stmt.execute(sql.toString()); + + // DROP TABLE statement + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 0; + + // CREATE TABLE statement + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 0; + + boolean notEmptyResult = false; + + for (int i = 0; i < stmtCnt; ++i) { + assert stmt.getMoreResults(); + + if (i % 2 == 0) { + assert stmt.getResultSet() == null; + assert stmt.getUpdateCount() == 1; + } + else { + assert stmt.getUpdateCount() == -1; + + ResultSet rs = stmt.getResultSet(); + + int rowsCnt = 0; + + while(rs.next()) + rowsCnt++; + + assert rowsCnt <= (i + 1) / 2; + + if (rowsCnt == (i + 1) / 2) + notEmptyResult = true; + } + } + + assert notEmptyResult; + + assert !stmt.getMoreResults(); } /** @@ -462,7 +559,7 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { } }, SQLException.class, - "The query is not DML" + "Given statement type does not match that declared by JDBC driver" ); } @@ -776,12 +873,14 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { public void testGetMoreResults() throws Exception { assert !stmt.getMoreResults(); - stmt.execute("select 1"); + stmt.execute("select 1; "); ResultSet rs = stmt.getResultSet(); assert !stmt.getMoreResults(); + assert stmt.getResultSet() == null; + assert rs.isClosed(); stmt.close(); @@ -801,7 +900,7 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { assert !stmt.getMoreResults(Statement.KEEP_CURRENT_RESULT); assert !stmt.getMoreResults(Statement.CLOSE_ALL_RESULTS); - stmt.execute("select 1"); + stmt.execute("select 1; "); ResultSet rs = stmt.getResultSet(); @@ -997,7 +1096,10 @@ public class JdbcThinStatementSelfTest extends JdbcThinAbstractSelfTest { /** * @throws Exception If failed. */ - public void testStatementTypeMismatchSelect() throws Exception { + public void testStatementTypeMismatchSelectForCachedQuery() throws Exception { + // Put query to cache. + stmt.executeQuery("select 1;"); + GridTestUtils.assertThrows(log, new Callable() { @Override public Object call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java index 54f8396..2d128d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java @@ -81,6 +81,24 @@ public class SqlFieldsQuery extends Query> { private String schema; /** + * Copy constructs SQL fields query. + * + * @param qry SQL query. + */ + public SqlFieldsQuery(SqlFieldsQuery qry) { + sql = qry.sql; + args = qry.args; + collocated = qry.collocated; + timeout = qry.timeout; + enforceJoinOrder = qry.enforceJoinOrder; + distributedJoins = qry.distributedJoins; + replicatedOnly = qry.replicatedOnly; + lazy = qry.lazy; + parts = qry.parts; + schema = qry.schema; + } + + /** * Constructs SQL fields query. * * @param sql SQL query. http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java index d13ef68..2ce7983 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinDatabaseMetadata.java @@ -311,7 +311,7 @@ public class JdbcThinDatabaseMetadata implements DatabaseMetaData { /** {@inheritDoc} */ @Override public boolean supportsMultipleResultSets() throws SQLException { - return false; + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java index ce1b65c..fb2810d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java @@ -238,7 +238,7 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep @Override public boolean execute() throws SQLException { executeWithArguments(JdbcStatementType.ANY_STATEMENT_TYPE); - return rs.isQuery(); + return resultSets.get(0).isQuery(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java index 189175b..ff93274 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java @@ -187,7 +187,7 @@ public class JdbcThinResultSet implements ResultSet { this.fetchSize = fetchSize; this.rows = rows; - rowsIter = rows.iterator(); + rowsIter = rows != null ? rows.iterator() : null; } else this.updCnt = updCnt; @@ -228,10 +228,10 @@ public class JdbcThinResultSet implements ResultSet { /** {@inheritDoc} */ @Override public void close() throws SQLException { - if (closeStmt) - stmt.close(); - close0(); + + if (closeStmt) + stmt.closeIfAllResultsClosed(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index 8e096c8..603545b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -25,6 +25,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; @@ -33,9 +34,12 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.FETCH_FORWARD; @@ -60,15 +64,9 @@ public class JdbcThinStatement implements Statement { /** Query timeout. */ private int timeout; - /** Current result set. */ - protected JdbcThinResultSet rs; - /** Fetch size. */ private int pageSize = DFLT_PAGE_SIZE; - /** Result set or update count has been already read. */ - private boolean alreadyRead; - /** Result set holdability*/ private final int resHoldability; @@ -78,6 +76,12 @@ public class JdbcThinStatement implements Statement { /** Close this statement on result set close. */ private boolean closeOnCompletion; + /** Result sets. */ + protected List resultSets; + + /** Current result index. */ + protected int curRes; + /** * Creates new statement. * @@ -113,24 +117,58 @@ public class JdbcThinStatement implements Statement { protected void execute0(JdbcStatementType stmtType, String sql, List args) throws SQLException { ensureNotClosed(); - if (rs != null) { - rs.close0(); - - rs = null; - } - - alreadyRead = false; + closeResults(); if (sql == null || sql.isEmpty()) throw new SQLException("SQL query is empty."); - JdbcQueryExecuteResult res = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, conn.getSchema(), pageSize, + JdbcResult res0 = conn.sendRequest(new JdbcQueryExecuteRequest(stmtType, conn.getSchema(), pageSize, maxRows, sql, args == null ? null : args.toArray(new Object[args.size()]))); - assert res != null; + assert res0 != null; - rs = new JdbcThinResultSet(this, res.getQueryId(), pageSize, res.last(), res.items(), - res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(), closeOnCompletion); + if (res0 instanceof JdbcQueryExecuteResult) { + JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0; + + resultSets = Collections.singletonList(new JdbcThinResultSet(this, res.getQueryId(), pageSize, + res.last(), res.items(), res.isQuery(), conn.autoCloseServerCursor(), res.updateCount(), + closeOnCompletion)); + } + else if (res0 instanceof JdbcQueryExecuteMultipleStatementsResult) { + JdbcQueryExecuteMultipleStatementsResult res = (JdbcQueryExecuteMultipleStatementsResult)res0; + + List resInfos = res.results(); + + resultSets = new ArrayList<>(resInfos.size()); + + boolean firstRes = true; + + for(JdbcResultInfo rsInfo : resInfos) { + if (!rsInfo.isQuery()) { + resultSets.add(new JdbcThinResultSet(this, -1, pageSize, + true, Collections.>emptyList(), false, + conn.autoCloseServerCursor(), rsInfo.updateCount(), closeOnCompletion)); + } + else { + if (firstRes) { + firstRes = false; + + resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize, + res.isLast(), res.items(), true, + conn.autoCloseServerCursor(), -1, closeOnCompletion)); + } + else { + resultSets.add(new JdbcThinResultSet(this, rsInfo.queryId(), pageSize, + false, null, true, + conn.autoCloseServerCursor(), -1, closeOnCompletion)); + } + } + } + } + else + throw new SQLException("Unexpected result [res=" + res0 + ']'); + + assert resultSets.size() > 0 : "At least one results set is expected"; } /** {@inheritDoc} */ @@ -140,7 +178,7 @@ public class JdbcThinStatement implements Statement { int res = getUpdateCount(); if (res == -1) - throw new SQLException("The query is not DML statement: " + sql, SqlStateCode.PARSING_EXCEPTION); + throw new SQLException("The query is not DML statememt: " + sql); return res; } @@ -150,12 +188,25 @@ public class JdbcThinStatement implements Statement { if (isClosed()) return; - if (rs != null) - rs.close0(); + closeResults(); closed = true; } + /** + * Close results. + * @throws SQLException On error. + */ + private void closeResults() throws SQLException { + if (resultSets != null) { + for (JdbcThinResultSet rs : resultSets) + rs.close0(); + + resultSets = null; + curRes = 0; + } + } + /** {@inheritDoc} */ @Override public int getMaxFieldSize() throws SQLException { ensureNotClosed(); @@ -242,31 +293,39 @@ public class JdbcThinStatement implements Statement { execute0(JdbcStatementType.ANY_STATEMENT_TYPE, sql, null); - return rs.isQuery(); + return resultSets.get(0).isQuery(); } /** {@inheritDoc} */ @Override public ResultSet getResultSet() throws SQLException { - JdbcThinResultSet rs = lastResultSet(); + JdbcThinResultSet rs = nextResultSet(); - ResultSet res = rs == null || !rs.isQuery() ? null : rs; + if (rs == null) + return null; - if (res != null) - alreadyRead = true; + if (!rs.isQuery()) { + curRes--; - return res; + return null; + } + + return rs; } /** {@inheritDoc} */ @Override public int getUpdateCount() throws SQLException { - JdbcThinResultSet rs = lastResultSet(); + JdbcThinResultSet rs = nextResultSet(); + + if (rs == null) + return -1; - int res = rs == null || rs.isQuery() ? -1 : (int)rs.updatedCount(); + if (rs.isQuery()) { + curRes--; - if (res != -1) - alreadyRead = true; + return -1; + } - return res; + return (int)rs.updatedCount(); } /** @@ -275,13 +334,13 @@ public class JdbcThinStatement implements Statement { * @return Result set or null. * @throws SQLException If failed. */ - private JdbcThinResultSet lastResultSet() throws SQLException { + private JdbcThinResultSet nextResultSet() throws SQLException { ensureNotClosed(); - if (rs == null || alreadyRead) + if (resultSets == null || curRes >= resultSets.size()) return null; - - return rs; + else + return resultSets.get(curRes++); } /** {@inheritDoc} */ @@ -358,13 +417,7 @@ public class JdbcThinStatement implements Statement { @Override public int[] executeBatch() throws SQLException { ensureNotClosed(); - if (rs != null) { - rs.close0(); - - rs = null; - } - - alreadyRead = false; + closeResults(); if (batch == null || batch.isEmpty()) throw new SQLException("Batch is empty."); @@ -395,22 +448,32 @@ public class JdbcThinStatement implements Statement { @Override public boolean getMoreResults(int curr) throws SQLException { ensureNotClosed(); - switch (curr) { - case CLOSE_CURRENT_RESULT: - case CLOSE_ALL_RESULTS: - if (rs != null) - rs.close(); + if (resultSets != null) { + assert curRes <= resultSets.size() : "Invalid results state: [resultsCount=" + resultSets.size() + + ", curRes=" + curRes + ']'; - break; + switch (curr) { + case CLOSE_CURRENT_RESULT: + if (curRes > 0) + resultSets.get(curRes - 1).close0(); - case KEEP_CURRENT_RESULT: - break; + break; - default: - throw new SQLException("Invalid 'current' parameter."); + case CLOSE_ALL_RESULTS: + for (int i = 0; i < curRes; ++i) + resultSets.get(i).close0(); + + break; + + case KEEP_CURRENT_RESULT: + break; + + default: + throw new SQLException("Invalid 'current' parameter."); + } } - return false; + return (resultSets != null && curRes < resultSets.size()); } /** {@inheritDoc} */ @@ -532,8 +595,10 @@ public class JdbcThinStatement implements Statement { closeOnCompletion = true; - if (rs != null) - rs.closeStatement(true); + if (resultSets != null) { + for (JdbcThinResultSet rs : resultSets) + rs.closeStatement(true); + } } /** {@inheritDoc} */ @@ -568,4 +633,25 @@ public class JdbcThinStatement implements Statement { if (isClosed()) throw new SQLException("Statement is closed."); } + + /** + * Used by statement on closeOnCompletion mode. + * @throws SQLException On error. + */ + void closeIfAllResultsClosed() throws SQLException { + if (isClosed()) + return; + + boolean allRsClosed = true; + + if (resultSets != null) { + for (JdbcThinResultSet rs : resultSets) { + if (!rs.isClosed()) + allRsClosed = false; + } + } + + if (allRsClosed) + close(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index d7fa9d0..7ac9c2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -46,12 +46,15 @@ import org.apache.ignite.lang.IgniteProductVersion; * JDBC IO layer implementation based on blocking IPC streams. */ public class JdbcThinTcpIo { - /** Current version. */ - private static final ClientListenerProtocolVersion CURRENT_VER = ClientListenerProtocolVersion.create(2, 1, 5); - /** Version 2.1.0. */ private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0); + /** Version 2.3.1. */ + private static final ClientListenerProtocolVersion VER_2_3_1 = ClientListenerProtocolVersion.create(2, 3, 1); + + /** Current version. */ + private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_3_1; + /** Initial output stream capacity for handshake. */ private static final int HANDSHAKE_MSG_SIZE = 13; http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java index 12be361..38d1972 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java @@ -37,8 +37,11 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { /** Version 2.1.5: added "lazy" flag. */ private static final ClientListenerProtocolVersion VER_2_1_5 = ClientListenerProtocolVersion.create(2, 1, 5); + /** Version 2.3.1: added "multiple statements query" feature. */ + public static final ClientListenerProtocolVersion VER_2_3_1 = ClientListenerProtocolVersion.create(2, 3, 1); + /** Current version. */ - private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_1_5; + private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_3_1; /** Supported versions. */ private static final Set SUPPORTED_VERS = new HashSet<>(); @@ -60,6 +63,7 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { static { SUPPORTED_VERS.add(CURRENT_VER); + SUPPORTED_VERS.add(VER_2_1_5); SUPPORTED_VERS.add(VER_2_1_0); } @@ -100,8 +104,8 @@ public class JdbcConnectionContext implements ClientListenerConnectionContext { if (ver.compareTo(VER_2_1_5) >= 0) lazyExec = reader.readBoolean(); - handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, - enforceJoinOrder, collocated, replicatedOnly, autoCloseCursors, lazyExec); + handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder, + collocated, replicatedOnly, autoCloseCursors, lazyExec, ver); parser = new JdbcMessageParser(ctx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java new file mode 100644 index 0000000..9bbdd59 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryExecuteMultipleStatementsResult.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * JDBC query execute result for query with multiple SQL statements. + */ +public class JdbcQueryExecuteMultipleStatementsResult extends JdbcResult { + /** Statements results. */ + private List results; + + /** Query result rows for the first query. */ + private List> items; + + /** Flag indicating the query has no unfetched results for the first query. */ + private boolean last; + + /** + * Default constructor. + */ + JdbcQueryExecuteMultipleStatementsResult() { + super(QRY_EXEC_MULT); + } + + /** + * @param results Statements results. + * @param items Query result rows for the first query. + * @param last Flag indicating the query has no unfetched results for the first query. + */ + public JdbcQueryExecuteMultipleStatementsResult(List results, + List> items, boolean last) { + super(QRY_EXEC_MULT); + this.results = results; + this.items = items; + this.last = last; + } + + /** + * @return Update counts of query IDs. + */ + public List results() { + return results; + } + + /** + * @return Query result rows for the first query. + */ + public List> items() { + return items; + } + + /** + * @return Flag indicating the query has no unfetched results for the first query. + */ + public boolean isLast() { + return last; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + super.writeBinary(writer); + + if (results != null && results.size() > 0) { + writer.writeInt(results.size()); + + for (JdbcResultInfo r : results) + r.writeBinary(writer); + + if (results.get(0).isQuery()) { + writer.writeBoolean(last); + + JdbcUtils.writeItems(writer, items); + } + } + else + writer.writeInt(0); + } + + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + super.readBinary(reader); + + int cnt = reader.readInt(); + + if (cnt == 0) + results = Collections.emptyList(); + else { + results = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; ++i) { + JdbcResultInfo r = new JdbcResultInfo(); + + r.readBinary(reader); + + results.add(r); + } + + if (results.get(0).isQuery()) { + last = reader.readBoolean(); + + items = JdbcUtils.readItems(reader); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcQueryExecuteMultipleStatementsResult.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index ea25b11..202f813 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; @@ -102,6 +103,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { /** Automatic close of cursors. */ private final boolean autoCloseCursors; + /** Protocol version. */ + private ClientListenerProtocolVersion protocolVer; + /** * Constructor. * @@ -114,10 +118,11 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { * @param replicatedOnly Replicated only flag. * @param autoCloseCursors Flag to automatically close server cursors. * @param lazy Lazy query execution flag. + * @param protocolVer Protocol version. */ public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly, - boolean autoCloseCursors, boolean lazy) { + boolean autoCloseCursors, boolean lazy, ClientListenerProtocolVersion protocolVer) { this.ctx = ctx; this.busyLock = busyLock; this.maxCursors = maxCursors; @@ -127,6 +132,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { this.replicatedOnly = replicatedOnly; this.autoCloseCursors = autoCloseCursors; this.lazy = lazy; + this.protocolVer = protocolVer; log = ctx.log(getClass()); } @@ -284,31 +290,71 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setSchema(schemaName); - FieldsQueryCursor> qryCur = ctx.query().querySqlFieldsNoCache(qry, true); + List>> results = ctx.query().querySqlFieldsNoCache(qry, true, + protocolVer.compareTo(JdbcConnectionContext.VER_2_3_1) < 0); + + if (results.size() == 1) { + FieldsQueryCursor> qryCur = results.get(0); + + JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur); + + JdbcQueryExecuteResult res; + + if (cur.isQuery()) + res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext()); + else { + List> items = cur.fetchRows(); - JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)qryCur); + assert items != null && items.size() == 1 && items.get(0).size() == 1 + && items.get(0).get(0) instanceof Long : + "Invalid result set for not-SELECT query. [qry=" + sql + + ", res=" + S.toString(List.class, items) + ']'; - JdbcQueryExecuteResult res; + res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0)); + } + + if (res.last() && (!res.isQuery() || autoCloseCursors)) + cur.close(); + else + qryCursors.put(qryId, cur); - if (cur.isQuery()) - res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext()); + return new JdbcResponse(res); + } else { - List> items = cur.fetchRows(); + List jdbcResults = new ArrayList<>(results.size()); + List> items = null; + boolean last = true; + + for (FieldsQueryCursor> c : results) { + QueryCursorImpl qryCur = (QueryCursorImpl)c; + + JdbcResultInfo jdbcRes; - assert items != null && items.size() == 1 && items.get(0).size() == 1 - && items.get(0).get(0) instanceof Long : - "Invalid result set for not-SELECT query. [qry=" + sql + - ", res=" + S.toString(List.class, items) + ']'; + if (qryCur.isQuery()) { + jdbcRes = new JdbcResultInfo(true, -1, qryId); - res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0)); + JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(), + (QueryCursorImpl)qryCur); + + qryCursors.put(qryId, cur); + + qryId = QRY_ID_GEN.getAndIncrement(); + + if (items == null) { + items = cur.fetchRows(); + last = cur.hasNext(); + } + } + else + jdbcRes = new JdbcResultInfo(false, (Long)((List)qryCur.getAll().get(0)).get(0), -1); + + jdbcResults.add(jdbcRes); + } + + return new JdbcResponse(new JdbcQueryExecuteMultipleStatementsResult(jdbcResults, items, last)); } - if (res.last() && (!res.isQuery() || autoCloseCursors)) - cur.close(); - else - qryCursors.put(qryId, cur); - return new JdbcResponse(res); } catch (Exception e) { qryCursors.remove(qryId); @@ -440,7 +486,7 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setSchema(schemaName); QueryCursorImpl> qryCur = (QueryCursorImpl>)ctx.query() - .querySqlFieldsNoCache(qry, true); + .querySqlFieldsNoCache(qry, true, true).get(0); assert !qryCur.isQuery(); http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java index 202905b..c6c7438 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java @@ -56,6 +56,9 @@ public class JdbcResult implements JdbcRawBinarylizable { /** Database schemas metadata result. */ static final byte META_SCHEMAS = 12; + /** Multiple statements query results. */ + static final byte QRY_EXEC_MULT = 13; + /** Success status. */ private byte type; @@ -139,6 +142,11 @@ public class JdbcResult implements JdbcRawBinarylizable { break; + case QRY_EXEC_MULT: + res = new JdbcQueryExecuteMultipleStatementsResult(); + + break; + default: throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java new file mode 100644 index 0000000..f0706e4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResultInfo.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * JDBC statement result information. Keeps statement type (SELECT or UPDATE) and + * queryId or update count (depends on statement type). + */ +public class JdbcResultInfo implements JdbcRawBinarylizable { + /** Query flag. */ + private boolean isQuery; + + /** Update count. */ + private long updCnt; + + /** Query ID. */ + private long qryId; + + /** + * Default constructor is used for serialization. + */ + JdbcResultInfo() { + // No-op. + } + + /** + * @param isQuery Query flag. + * @param updCnt Update count. + * @param qryId Query ID. + */ + public JdbcResultInfo(boolean isQuery, long updCnt, long qryId) { + this.isQuery = isQuery; + this.updCnt = updCnt; + this.qryId = qryId; + } + + /** + * @return Query flag. + */ + public boolean isQuery() { + return isQuery; + } + + /** + * @return Query ID. + */ + public long queryId() { + return qryId; + } + + /** + * @return Update count. + */ + public long updateCount() { + return updCnt; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) { + writer.writeBoolean(isQuery); + writer.writeLong(updCnt); + writer.writeLong(qryId); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) { + isQuery = reader.readBoolean(); + updCnt = reader.readLong(); + qryId = reader.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcResultInfo.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index cecc5dd..b8445ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -83,11 +83,14 @@ public interface GridQueryIndexing { * @param keepBinary Keep binary flag. * @param cancel Query cancel. * @param mainCacheId Main cache ID. + * @param failOnMultipleStmts If {@code true} the method must throws exception when query contains + * more then one SQL statement. * @return Cursor. * @throws IgniteCheckedException If failed. */ - public FieldsQueryCursor> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException; + public List>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, + boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId, boolean failOnMultipleStmts) + throws IgniteCheckedException; /** * Perform a MERGE statement using data streamer as receiver. http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index e8cc852..58c3ce9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -1867,7 +1867,8 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (cctx.config().getQueryParallelism() > 1) { qry.setDistributedJoins(true); - cur = idx.queryDistributedSqlFields(schemaName, qry, keepBinary, cancel, mainCacheId); + cur = idx.queryDistributedSqlFields(schemaName, qry, + keepBinary, cancel, mainCacheId, true).get(0); } else { IndexingQueryFilter filter = idx.backupFilter(requestTopVer.get(), qry.getPartitions()); @@ -1884,7 +1885,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { else { clo = new IgniteOutClosureX>>() { @Override public FieldsQueryCursor> applyx() throws IgniteCheckedException { - return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId); + return idx.queryDistributedSqlFields(schemaName, qry, keepBinary, null, mainCacheId, true).get(0); } }; } @@ -1906,7 +1907,20 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param keepBinary Keep binary flag. * @return Cursor. */ - public FieldsQueryCursor> querySqlFieldsNoCache(final SqlFieldsQuery qry, final boolean keepBinary) { + public FieldsQueryCursor> querySqlFieldsNoCache(final SqlFieldsQuery qry, + final boolean keepBinary) { + return querySqlFieldsNoCache(qry, keepBinary, true).get(0); + } + + /** + * Query SQL fields without strict dependency on concrete cache. + * + * @param qry Query. + * @param keepBinary Keep binary flag. + * @return Cursor. + */ + public List>> querySqlFieldsNoCache(final SqlFieldsQuery qry, + final boolean keepBinary, final boolean failOnMultipleStmts) { checkxEnabled(); validateSqlFieldsQuery(qry); @@ -1921,11 +1935,13 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - IgniteOutClosureX>> clo = new IgniteOutClosureX>>() { - @Override public FieldsQueryCursor> applyx() throws IgniteCheckedException { + IgniteOutClosureX>>> clo = + new IgniteOutClosureX>>>() { + @Override public List>> applyx() throws IgniteCheckedException { GridQueryCancel cancel = new GridQueryCancel(); - return idx.queryDistributedSqlFields(qry.getSchema(), qry, keepBinary, cancel, null); + return idx.queryDistributedSqlFields(qry.getSchema(), qry, keepBinary, cancel, null, + failOnMultipleStmts); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java index c745d8a..1ebf556 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java @@ -242,12 +242,12 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT } /** {@inheritDoc} */ - @Override public FieldsQueryCursor> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, - boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) throws IgniteCheckedException { + @Override public List>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, + boolean keepBinary, GridQueryCancel cancel, + @Nullable Integer mainCacheId, boolean failOnMultipleStmts) throws IgniteCheckedException { return null; } - /** {@inheritDoc} */ @Override public long streamUpdateQuery(String spaceName, String qry, @Nullable Object[] params, IgniteDataStreamer streamer) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index f2f2fd4..ee1875f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -150,7 +150,7 @@ public class DmlStatementsProcessor { * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications. * * @param schemaName Schema. - * @param stmt JDBC statement. + * @param prepared Prepared JDBC statement. * @param fieldsQry Original query. * @param loc Query locality flag. * @param filters Cache name and key filter. @@ -158,13 +158,13 @@ public class DmlStatementsProcessor { * @return Update result (modified items count and failed keys). * @throws IgniteCheckedException if failed. */ - private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, + private UpdateResult updateSqlFields(String schemaName, Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { Object[] errKeys = null; long items = 0; - UpdatePlan plan = getPlanForStatement(schemaName, stmt, null); + UpdatePlan plan = getPlanForStatement(schemaName, prepared, null); GridCacheContext cctx = plan.tbl.rowDescriptor().context(); @@ -188,7 +188,7 @@ public class DmlStatementsProcessor { UpdateResult r; try { - r = executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys); + r = executeUpdateStatement(schemaName, cctx, prepared, fieldsQry, loc, filters, cancel, errKeys); } finally { cctx.operationContextPerCall(opCtx); @@ -213,16 +213,16 @@ public class DmlStatementsProcessor { /** * @param schemaName Schema. - * @param stmt Prepared statement. + * @param p Prepared. * @param fieldsQry Initial query * @param cancel Query cancel. * @return Update result wrapped into {@link GridQueryFieldsResult} * @throws IgniteCheckedException if failed. */ @SuppressWarnings("unchecked") - QueryCursorImpl> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt, + QueryCursorImpl> updateSqlFieldsDistributed(String schemaName, Prepared p, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel); + UpdateResult res = updateSqlFields(schemaName, p, fieldsQry, false, null, cancel); QueryCursorImpl> resCur = (QueryCursorImpl>)new QueryCursorImpl(Collections.singletonList (Collections.singletonList(res.cnt)), cancel, false); @@ -247,7 +247,8 @@ public class DmlStatementsProcessor { GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException { - UpdateResult res = updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel); + UpdateResult res = updateSqlFields(schemaName, GridSqlQueryParser.prepared(stmt), fieldsQry, true, + filters, cancel); return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META, new IgniteSingletonIterator(Collections.singletonList(res.cnt))); @@ -340,22 +341,24 @@ public class DmlStatementsProcessor { * * @param schemaName Schema name. * @param cctx Cache context. - * @param prepStmt Prepared statement for DML query. + * @param prepared Prepared statement for DML query. * @param fieldsQry Fields query. + * @param loc Local query flag. * @param filters Cache name and key filter. + * @param cancel Query cancel state holder. * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction. * @return Pair [number of successfully processed items; keys that have failed to be processed] * @throws IgniteCheckedException if failed. */ @SuppressWarnings({"ConstantConditions", "unchecked"}) private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, - PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, + Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException { int mainCacheId = CU.cacheId(cctx.name()); Integer errKeysPos = null; - UpdatePlan plan = getPlanForStatement(schemaName, prepStmt, errKeysPos); + UpdatePlan plan = getPlanForStatement(schemaName, prepared, errKeysPos); if (plan.fastUpdateArgs != null) { assert F.isEmpty(failedKeys) && errKeysPos == null; @@ -378,8 +381,8 @@ public class DmlStatementsProcessor { .setPageSize(fieldsQry.getPageSize()) .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS); - cur = (QueryCursorImpl>) idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, - mainCacheId); + cur = (QueryCursorImpl>)idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, + cancel, mainCacheId, true).get(0); } else { final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry, @@ -423,14 +426,12 @@ public class DmlStatementsProcessor { * if available. * * @param schema Schema. - * @param prepStmt JDBC statement. + * @param p Prepared JDBC statement. * @return Update plan. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private UpdatePlan getPlanForStatement(String schema, PreparedStatement prepStmt, @Nullable Integer errKeysPos) + private UpdatePlan getPlanForStatement(String schema, Prepared p, @Nullable Integer errKeysPos) throws IgniteCheckedException { - Prepared p = GridSqlQueryParser.prepared(prepStmt); - H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL()); UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null); http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 00a37ce..9e6a1fa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -145,7 +145,6 @@ import org.h2.command.dml.Insert; import org.h2.engine.Session; import org.h2.engine.SysProperties; import org.h2.index.Index; -import org.h2.jdbc.JdbcPreparedStatement; import org.h2.jdbc.JdbcStatement; import org.h2.server.web.WebServer; import org.h2.table.IndexColumn; @@ -1244,7 +1243,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (qry.getTimeout() > 0) fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS); - final QueryCursor> res = queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId); + final QueryCursor> res = + queryDistributedSqlFields(schemaName, fqry, keepBinary, null, mainCacheId, true).get(0); final Iterable> converted = new Iterable>() { @Override public Iterator> iterator() { @@ -1277,10 +1277,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public FieldsQueryCursor> queryDistributedSqlFields(String schemaName, - SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId) { - final String sqlQry = qry.getSql(); - + @Override public List>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, + boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId, boolean failOnMultipleStmts) { Connection c = connectionForSchema(schemaName); final boolean enforceJoinOrder = qry.isEnforceJoinOrder(); @@ -1289,18 +1287,38 @@ public class IgniteH2Indexing implements GridQueryIndexing { final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins); - GridCacheTwoStepQuery twoStepQry = null; - List meta; + String sqlQry = qry.getSql(); - final H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated, + H2TwoStepCachedQueryKey cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated, distributedJoins, enforceJoinOrder, qry.isLocal()); H2TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey); if (cachedQry != null) { - twoStepQry = cachedQry.query().copy(); - meta = cachedQry.meta(); + checkQueryType(qry, true); + + GridCacheTwoStepQuery twoStepQry = cachedQry.query().copy(); + + List meta = cachedQry.meta(); + + List>> res = Collections.singletonList(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(), + qry.getArgs(), keepBinary, qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder, + twoStepQry, meta)); + + return res; } - else { + + List>> res = new ArrayList<>(1); + + Object[] argsOrig = qry.getArgs(); + int firstArg = 0; + Object[] args; + String remainingSql = sqlQry; + + while (remainingSql != null) { + args = null; + GridCacheTwoStepQuery twoStepQry = null; + List meta; + final UUID locNodeId = ctx.localNodeId(); // Here we will just parse the statement, no need to optimize it at all. @@ -1319,16 +1337,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { while (true) { try { // Do not cache this statement because the whole query object will be cached later on. - stmt = prepareStatement(c, sqlQry, false); + stmt = prepareStatement(c, remainingSql, false); break; } catch (SQLException e) { if (!cachesCreated && ( e.getErrorCode() == ErrorCode.SCHEMA_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || - e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) - ) { + e.getErrorCode() == ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1 || + e.getErrorCode() == ErrorCode.INDEX_NOT_FOUND_1) + ) { try { ctx.cache().createMissingQueryCaches(); } @@ -1344,19 +1362,59 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - prepared = GridSqlQueryParser.prepared(stmt); + GridSqlQueryParser.PreparedWithRemaining prep = GridSqlQueryParser.preparedWithRemaining(stmt); + + // remaining == null if the query string contains single SQL statement. + remainingSql = prep.remainingSql(); + + if (remainingSql != null && failOnMultipleStmts) + throw new IgniteSQLException("Multiple statements queries are not supported"); + + sqlQry = prep.prepared().getSQL(); + + prepared = prep.prepared(); + + int paramsCnt = prepared.getParameters().size(); + + if (paramsCnt > 0) { + if (argsOrig == null || argsOrig.length < firstArg + paramsCnt) { + throw new IgniteException("Invalid number of query parameters. " + + "Cannot find " + (argsOrig.length + 1 - firstArg) + " parameter."); + } + + args = Arrays.copyOfRange(argsOrig, firstArg, firstArg + paramsCnt); + + firstArg += paramsCnt; + } + + cachedQryKey = new H2TwoStepCachedQueryKey(schemaName, sqlQry, grpByCollocated, + distributedJoins, enforceJoinOrder, qry.isLocal()); + + cachedQry = twoStepCache.get(cachedQryKey); + + if (cachedQry != null) { + checkQueryType(qry, true); + + twoStepQry = cachedQry.query().copy(); + meta = cachedQry.meta(); - if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery()) - throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", - IgniteQueryErrorCode.STMT_TYPE_MISMATCH); + res.add(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(), args, keepBinary, + qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder, + twoStepQry, meta)); - if (prepared.isQuery()) { - bindParameters(stmt, F.asList(qry.getArgs())); + continue; + } + else { + checkQueryType(qry, prepared.isQuery()); + + if (prepared.isQuery()) { + bindParameters(stmt, F.asList(args)); - twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), - grpByCollocated, distributedJoins, enforceJoinOrder, this); + twoStepQry = GridSqlQuerySplitter.split(c, prepared, args, + grpByCollocated, distributedJoins, enforceJoinOrder, this); - assert twoStepQry != null; + assert twoStepQry != null; + } } } finally { @@ -1367,17 +1425,22 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (twoStepQry == null) { if (DmlStatementsProcessor.isDmlStatement(prepared)) { try { - return dmlProc.updateSqlFieldsDistributed(schemaName, stmt, qry, cancel); + res.add(dmlProc.updateSqlFieldsDistributed(schemaName, prepared, + new SqlFieldsQuery(qry).setSql(sqlQry).setArgs(args), cancel)); + + continue; } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute DML statement [stmt=" + sqlQry + - ", params=" + Arrays.deepToString(qry.getArgs()) + "]", e); + ", params=" + Arrays.deepToString(args) + "]", e); } } if (DdlStatementsProcessor.isDdlStatement(prepared)) { try { - return ddlProc.runDdlStatement(sqlQry, stmt); + res.add(ddlProc.runDdlStatement(sqlQry, prepared)); + + continue; } catch (IgniteCheckedException e) { throw new IgniteSQLException("Failed to execute DDL statement [stmt=" + sqlQry + ']', e); @@ -1428,39 +1491,75 @@ public class IgniteH2Indexing implements GridQueryIndexing { finally { U.close(stmt, log); } + + res.add(executeTwoStepsQuery(schemaName, qry.getPageSize(), qry.getPartitions(), args, keepBinary, + qry.isLazy(), qry.getTimeout(), cancel, sqlQry, enforceJoinOrder, + twoStepQry, meta)); + + if (cachedQry == null && !twoStepQry.explain()) { + cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy()); + + twoStepCache.putIfAbsent(cachedQryKey, cachedQry); + } } + return res; + } + + /** + * Check expected statement type (when it is set by JDBC) and given statement type. + * + * @param qry Query. + * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}. + */ + private void checkQueryType(SqlFieldsQuery qry, boolean isQry) { + if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != isQry) + throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", + IgniteQueryErrorCode.STMT_TYPE_MISMATCH); + } + + /** + * @param schemaName Schema name. + * @param pageSize Page size. + * @param partitions Partitions. + * @param args Arguments. + * @param keepBinary Keep binary flag. + * @param lazy Lazy flag. + * @param timeout Timeout. + * @param cancel Cancel. + * @param sqlQry SQL query string. + * @param enforceJoinOrder Enforce join orded flag. + * @param twoStepQry Two-steps query. + * @param meta Metadata. + * @return Cursor. + */ + private FieldsQueryCursor> executeTwoStepsQuery(String schemaName, int pageSize, int partitions[], + Object[] args, boolean keepBinary, boolean lazy, int timeout, + GridQueryCancel cancel, String sqlQry, boolean enforceJoinOrder, GridCacheTwoStepQuery twoStepQry, + List meta) { if (log.isDebugEnabled()) log.debug("Parsed query: `" + sqlQry + "` into two step query: " + twoStepQry); - twoStepQry.pageSize(qry.getPageSize()); + twoStepQry.pageSize(pageSize); if (cancel == null) cancel = new GridQueryCancel(); - int partitions[] = qry.getPartitions(); - if (partitions == null && twoStepQry.derivedPartitions() != null) { try { - partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), qry.getArgs()); + partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), args); } catch (IgniteCheckedException e) { throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" + - Arrays.deepToString(qry.getArgs()) + "]", e); + Arrays.deepToString(args) + "]", e); } } QueryCursorImpl> cursor = new QueryCursorImpl<>( - runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel, - qry.getArgs(), partitions, qry.isLazy()), cancel); + runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, timeout, cancel, + args, partitions, lazy), cancel); cursor.fieldsMeta(meta); - if (cachedQry == null && !twoStepQry.explain()) { - cachedQry = new H2TwoStepCachedQuery(meta, twoStepQry.copy()); - - twoStepCache.putIfAbsent(cachedQryKey, cachedQry); - } - return cursor; } http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java index 4c3264c..8a901dc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.ddl; -import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -59,7 +58,6 @@ import org.h2.command.ddl.CreateIndex; import org.h2.command.ddl.CreateTable; import org.h2.command.ddl.DropIndex; import org.h2.command.ddl.DropTable; -import org.h2.jdbc.JdbcPreparedStatement; import org.h2.table.Column; import org.h2.value.DataType; @@ -91,17 +89,18 @@ public class DdlStatementsProcessor { * Execute DDL statement. * * @param sql SQL. - * @param stmt H2 statement to parse and execute. + * @param prepared Prepared. + * @return Cursor on query results. + * @throws IgniteCheckedException On error. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public FieldsQueryCursor> runDdlStatement(String sql, PreparedStatement stmt) + public FieldsQueryCursor> runDdlStatement(String sql, Prepared prepared) throws IgniteCheckedException { - assert stmt instanceof JdbcPreparedStatement; IgniteInternalFuture fut = null; try { - GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(GridSqlQueryParser.prepared(stmt)); + GridSqlStatement stmt0 = new GridSqlQueryParser(false).parse(prepared); if (stmt0 instanceof GridSqlCreateIndex) { GridSqlCreateIndex cmd = (GridSqlCreateIndex)stmt0; http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index a379a91..3d7a1a0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -397,6 +397,29 @@ public class GridSqlQueryParser { /** */ private static final Getter COLUMN_CHECK_CONSTRAINT = getter(Column.class, "checkConstraint"); + /** Class for private class: 'org.h2.command.CommandList'. */ + private static final Class CLS_COMMAND_LIST; + + /** */ + private static final Getter LIST_COMMAND; + + /** */ + private static final Getter REMAINING; + + static { + try { + CLS_COMMAND_LIST = (Class)CommandContainer.class.getClassLoader() + .loadClass("org.h2.command.CommandList"); + + LIST_COMMAND = getter(CLS_COMMAND_LIST, "command"); + + REMAINING = getter(CLS_COMMAND_LIST, "remaining"); + } + catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + /** */ private static final Getter ALTER_COLUMN_TBL_NAME = getter(AlterTableAlterColumn.class, "tableName"); @@ -485,6 +508,26 @@ public class GridSqlQueryParser { } /** + * @param stmt Prepared statement. + * @return Parsed select. + */ + public static PreparedWithRemaining preparedWithRemaining(PreparedStatement stmt) { + Command cmd = COMMAND.get((JdbcPreparedStatement)stmt); + + if (cmd instanceof CommandContainer) + return new PreparedWithRemaining(PREPARED.get(cmd), null); + else { + Class cmdCls = cmd.getClass(); + + if (cmdCls.getName().equals("org.h2.command.CommandList")) { + return new PreparedWithRemaining(PREPARED.get(LIST_COMMAND.get(cmd)), REMAINING.get(cmd)); + } + else + throw new IgniteSQLException("Unexpected statement command"); + } + } + + /** * @param qry Query expression to parse. * @return Subquery AST. */ @@ -1798,4 +1841,42 @@ public class GridSqlQueryParser { } } } + + /** + * + */ + public static class PreparedWithRemaining { + /** Prepared. */ + private Prepared prepared; + + /** Remaining sql. */ + private String remainingSql; + + /** + * @param prepared Prepared. + * @param sql Remaining SQL. + */ + public PreparedWithRemaining(Prepared prepared, String sql) { + this.prepared = prepared; + + if (sql != null) + sql = sql.trim(); + + remainingSql = !F.isEmpty(sql) ? sql : null; + } + + /** + * @return Prepared. + */ + public Prepared prepared() { + return prepared; + } + + /** + * @return Remaining SQL. + */ + public String remainingSql() { + return remainingSql; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index b20cbd5..7f28203 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -51,7 +51,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.h2.command.Prepared; import org.h2.command.dml.Query; import org.h2.command.dml.SelectUnion; -import org.h2.jdbc.JdbcPreparedStatement; import org.h2.table.IndexColumn; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -172,7 +171,8 @@ public class GridSqlQuerySplitter { } /** - * @param stmt Prepared statement. + * @param conn Connection. + * @param prepared Prepared. * @param params Parameters. * @param collocatedGrpBy Whether the query has collocated GROUP BY keys. * @param distributedJoins If distributed joins enabled. @@ -183,7 +183,8 @@ public class GridSqlQuerySplitter { * @throws IgniteCheckedException If failed. */ public static GridCacheTwoStepQuery split( - JdbcPreparedStatement stmt, + Connection conn, + Prepared prepared, Object[] params, boolean collocatedGrpBy, boolean distributedJoins, @@ -195,7 +196,7 @@ public class GridSqlQuerySplitter { // Here we will just do initial query parsing. Do not use optimized // subqueries because we do not have unique FROM aliases yet. - GridSqlQuery qry = parse(prepared(stmt), false); + GridSqlQuery qry = parse(prepared, false); String originalSql = qry.getSQL(); @@ -213,8 +214,6 @@ public class GridSqlQuerySplitter { // debug("NORMALIZED", qry.getSQL()); - Connection conn = stmt.getConnection(); - // Here we will have correct normalized AST with optimized join order. // The distributedJoins parameter is ignored because it is not relevant for // the REDUCE query optimization. @@ -234,12 +233,12 @@ public class GridSqlQuerySplitter { boolean allCollocated = true; for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) { - Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params), + Prepared prepared0 = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params), true, enforceJoinOrder); - allCollocated &= isCollocated((Query)prepared); + allCollocated &= isCollocated((Query)prepared0); - mapSqlQry.query(parse(prepared, true).getSQL()); + mapSqlQry.query(parse(prepared0, true).getSQL()); } // We do not need distributed joins if all MAP queries are collocated. http://git-wip-us.apache.org/repos/asf/ignite/blob/405749a7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java new file mode 100644 index 0000000..8b9bf40 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MultipleStatementsSqlQuerySelfTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests for schemas. + */ +public class MultipleStatementsSqlQuerySelfTest extends GridCommonAbstractTest { + /** Node. */ + private IgniteEx node; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + node = (IgniteEx)startGrid(); + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * Test query without caches. + * + * @throws Exception If failed. + */ + public void testQuery() throws Exception { + GridQueryProcessor qryProc = node.context().query(); + + SqlFieldsQuery qry = new SqlFieldsQuery( + "create table test(ID int primary key, NAME varchar(20)); " + + "insert into test (ID, NAME) values (1, 'name_1');" + + "insert into test (ID, NAME) values (2, 'name_2'), (3, 'name_3');" + + "select * from test;") + .setSchema("PUBLIC"); + + List>> res = qryProc.querySqlFieldsNoCache(qry, true, false); + + assert res.size() == 4 : "Unexpected cursors count: " + res.size(); + + assert !((QueryCursorImpl)res.get(0)).isQuery() : "Results of DDL statement is expected "; + + List> rows = res.get(1).getAll(); + + assert !((QueryCursorImpl)res.get(1)).isQuery() : "Results of DDL statement is expected "; + assert Long.valueOf(1).equals(rows.get(0).get(0)) : "1 row must be updated. [actual=" + rows.get(0).get(0) + ']'; + + rows = res.get(2).getAll(); + + assert !((QueryCursorImpl)res.get(2)).isQuery() : "Results of DML statement is expected "; + assert Long.valueOf(2).equals(rows.get(0).get(0)) : "2 row must be updated"; + + rows = res.get(3).getAll(); + + assert ((QueryCursorImpl)res.get(3)).isQuery() : "Results of SELECT statement is expected "; + + assert rows.size() == 3 : "Invalid rows count: " + rows.size(); + + for (int i = 0; i < rows.size(); ++i) { + assert Integer.valueOf(1).equals(rows.get(i).get(0)) + || Integer.valueOf(2).equals(rows.get(i).get(0)) + || Integer.valueOf(3).equals(rows.get(i).get(0)) + : "Invalid ID: " + rows.get(i).get(0); + } + } + + /** + * Test query without caches. + * + * @throws Exception If failed. + */ + public void testQueryWithParameters() throws Exception { + GridQueryProcessor qryProc = node.context().query(); + + SqlFieldsQuery qry = new SqlFieldsQuery( + "create table test(ID int primary key, NAME varchar(20)); " + + "insert into test (ID, NAME) values (?, ?);" + + "insert into test (ID, NAME) values (?, ?), (?, ?);" + + "select * from test;") + .setSchema("PUBLIC") + .setArgs(1, "name_1", 2, "name2", 3, "name_3"); + + List>> res = qryProc.querySqlFieldsNoCache(qry, true, false); + + assert res.size() == 4 : "Unexpected cursors count: " + res.size(); + + assert !((QueryCursorImpl)res.get(0)).isQuery() : "Results of DDL statement is expected "; + + List> rows = res.get(1).getAll(); + + assert !((QueryCursorImpl)res.get(1)).isQuery() : "Results of DDL statement is expected "; + assert Long.valueOf(1).equals(rows.get(0).get(0)) : "1 row must be updated. [actual=" + rows.get(0).get(0) + ']'; + + rows = res.get(2).getAll(); + + assert !((QueryCursorImpl)res.get(2)).isQuery() : "Results of DML statement is expected "; + assert Long.valueOf(2).equals(rows.get(0).get(0)) : "2 row must be updated"; + + rows = res.get(3).getAll(); + + assert ((QueryCursorImpl)res.get(3)).isQuery() : "Results of SELECT statement is expected "; + + assert rows.size() == 3 : "Invalid rows count: " + rows.size(); + + for (int i = 0; i < rows.size(); ++i) { + assert Integer.valueOf(1).equals(rows.get(i).get(0)) + || Integer.valueOf(2).equals(rows.get(i).get(0)) + || Integer.valueOf(3).equals(rows.get(i).get(0)) + : "Invalid ID: " + rows.get(i).get(0); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueryMultipleStatementsFailed() throws Exception { + final SqlFieldsQuery qry = new SqlFieldsQuery("select 1; select 1;").setSchema("PUBLIC"); + + GridTestUtils.assertThrows(log, + new Callable() { + @Override public Object call() throws Exception { + node.context().query().querySqlFieldsNoCache(qry, true); + + return null; + } + }, IgniteSQLException.class, "Multiple statements queries are not supported"); + } +} \ No newline at end of file