ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [30/43] ignite git commit: IGNITE-5397: JDBC thin driver: avoid sending JdbcQueryCloseRequest message whenever possible. This closes #2095.
Date Thu, 08 Jun 2017 12:33:23 GMT
IGNITE-5397: JDBC thin driver: avoid sending JdbcQueryCloseRequest message whenever possible.
This closes #2095.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ecba1acc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ecba1acc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ecba1acc

Branch: refs/heads/ignite-5414
Commit: ecba1acc80ec395d312d7e70be53fdafb3ff6fc5
Parents: 5120a24
Author: Sergey Kalashnikov <skalashnikov@gridgain.com>
Authored: Wed Jun 7 15:29:28 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Jun 7 15:29:28 2017 +0300

----------------------------------------------------------------------
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   2 +
 .../thin/JdbcThinAutoCloseServerCursorTest.java | 359 +++++++++++++++++++
 .../jdbc/thin/JdbcThinConnectionSelfTest.java   |  36 ++
 .../internal/jdbc/thin/JdbcThinConnection.java  |   4 +-
 .../internal/jdbc/thin/JdbcThinResultSet.java   |  18 +-
 .../internal/jdbc/thin/JdbcThinStatement.java   |   2 +-
 .../internal/jdbc/thin/JdbcThinTcpIo.java       |  17 +-
 .../internal/jdbc/thin/JdbcThinUtils.java       |   7 +-
 .../processors/odbc/SqlListenerNioListener.java |   3 +-
 .../processors/odbc/jdbc/JdbcQueryCursor.java   |   9 +-
 .../odbc/jdbc/JdbcRequestHandler.java           |  21 +-
 11 files changed, 462 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 121b8df..9ca3582 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -31,6 +31,7 @@ import org.apache.ignite.jdbc.JdbcPojoQuerySelfTest;
 import org.apache.ignite.jdbc.JdbcPreparedStatementSelfTest;
 import org.apache.ignite.jdbc.JdbcResultSetSelfTest;
 import org.apache.ignite.jdbc.JdbcStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinDeleteStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinDynamicIndexAtomicPartitionedNearSelfTest;
@@ -116,6 +117,7 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(JdbcThinUpdateStatementSelfTest.class));
         suite.addTest(new TestSuite(JdbcThinMergeStatementSelfTest.class));
         suite.addTest(new TestSuite(JdbcThinDeleteStatementSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinAutoCloseServerCursorTest.class));
 
         // New thin JDBC driver, DDL tests
         suite.addTest(new TestSuite(JdbcThinDynamicIndexAtomicPartitionedNearSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java
new file mode 100644
index 0000000..eff504b
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAutoCloseServerCursorTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.jdbc.thin.JdbcThinUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests an optional optimization that server cursor is closed automatically
+ * when last result set page is transmitted.
+ */
+public class JdbcThinAutoCloseServerCursorTest extends JdbcThinAbstractSelfTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache";
+
+    /** URL. */
+    private static final String URL = "jdbc:ignite:thin://127.0.0.1/?" +
+        JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR + "=true";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        CacheConfiguration cache = defaultCacheConfiguration();
+
+        cache.setName(CACHE_NAME);
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(cache);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(3);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME);
+
+        cache.clear();
+    }
+
+    /**
+     * Ensure that server cursor is implicitly closed on last page.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public void testQuery() throws Exception {
+        IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME);
+
+        Person persons[] = new Person[] {
+            new Person(1, "John", 25),
+            new Person(2, "Mary", 23)
+        };
+
+        for (Person person: persons)
+            cache.put(person.id, person);
+
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            conn.setSchema(CACHE_NAME);
+
+            String sqlText = "select * from Person";
+
+            try (Statement stmt = conn.createStatement()) {
+                // Ensure that result set fits into one page
+                stmt.setFetchSize(2);
+
+                try (ResultSet rs = stmt.executeQuery(sqlText)) {
+                    // Attempt to get query metadata when server cursor is already closed
+                    GridTestUtils.assertThrows(log(), new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return rs.getMetaData();
+                            }
+                        },
+                        SQLException.class,
+                        "Server cursor is already closed"
+                    );
+
+                    GridTestUtils.assertThrows(log(), new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return rs.findColumn("id");
+                            }
+                        },
+                        SQLException.class,
+                        "Server cursor is already closed"
+                    );
+
+                    checkResultSet(rs, persons);
+                }
+            }
+
+            try (Statement stmt = conn.createStatement()) {
+                // Ensure multiple page result set
+                stmt.setFetchSize(1);
+
+                try (ResultSet rs = stmt.executeQuery(sqlText)) {
+                    // Getting result set metadata is OK here
+                    assertEquals(3, rs.getMetaData().getColumnCount());
+
+                    assertEquals(1, rs.findColumn("id"));
+
+                    checkResultSet(rs, persons);
+                }
+            }
+
+            try (Statement stmt = conn.createStatement()) {
+                // Ensure multiple page result set
+                stmt.setFetchSize(1);
+
+                try (ResultSet rs = stmt.executeQuery(sqlText)) {
+                    checkResultSet(rs, persons);
+
+                    // Server cursor is closed now
+                    GridTestUtils.assertThrows(log(), new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return rs.getMetaData();
+                            }
+                        },
+                        SQLException.class,
+                        "Server cursor is already closed"
+                    );
+
+                    GridTestUtils.assertThrows(log(), new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                return rs.findColumn("id");
+                            }
+                        },
+                        SQLException.class,
+                        "Server cursor is already closed"
+                    );
+                }
+            }
+        }
+    }
+
+    /**
+     * Ensure that insert works when auto close of server cursor is enabled.
+     *
+     * @throws Exception If failed.
+     */
+    public void testInsert() throws Exception {
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            conn.setSchema(CACHE_NAME);
+
+            String sqlText = "insert into Person (_key, id, name, age) values (?, ?, ?, ?)";
+
+            Person p = new Person(1, "John", 25);
+
+            try (PreparedStatement prepared = conn.prepareStatement(sqlText)) {
+                prepared.setInt(1, p.id);
+                prepared.setInt(2, p.id);
+                prepared.setString(3, p.name);
+                prepared.setInt(4, p.age);
+
+                assertFalse(prepared.execute());
+                assertEquals(1, prepared.getUpdateCount());
+            }
+
+            IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME);
+
+            assertEquals(p, cache.get(1));
+        }
+    }
+
+    /**
+     * Ensure that update works when auto close of server cursor is enabled.
+     *
+     * @throws Exception If failed.
+     */
+    public void testUpdate() throws Exception {
+        IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME);
+
+        Person p = new Person(1, "John", 25);
+
+        cache.put(1, p);
+
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            conn.setSchema(CACHE_NAME);
+
+            String sqlText = "update Person set age = age + 1";
+
+            try (Statement stmt = conn.createStatement()) {
+                assertEquals(1, stmt.executeUpdate(sqlText));
+            }
+
+            assertEquals(p.age + 1, cache.get(1).age);
+        }
+    }
+
+    /**
+     * Ensure that delete works when auto close of server cursor is enabled.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDelete() throws Exception {
+        IgniteCache<Integer, Person> cache = grid(0).cache(CACHE_NAME);
+
+        Person p = new Person(1, "John", 25);
+
+        cache.put(1, p);
+
+        try (Connection conn = DriverManager.getConnection(URL)) {
+            conn.setSchema(CACHE_NAME);
+
+            String sqlText = "delete Person where age = ?";
+
+            try (PreparedStatement prepared = conn.prepareStatement(sqlText)) {
+                prepared.setInt(1, p.age);
+
+                assertEquals(1, prepared.executeUpdate());
+            }
+
+            assertNull(cache.get(1));
+        }
+    }
+
+    /**
+     * Checks result set against array of Person.
+     *
+     * @param rs Result set.
+     * @param persons Array of Person.
+     * @throws Exception If failed.
+     */
+    private void checkResultSet(ResultSet rs, Person[] persons) throws Exception {
+        while (rs.next()) {
+            Person p = new Person(
+                rs.getInt(1),
+                rs.getString(2),
+                rs.getInt(3));
+
+            assert p.id > 0 && p.id <= persons.length;
+
+            assertEquals(persons[p.id - 1], p);
+        }
+    }
+
+    /**
+     * Person.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    static class Person implements Serializable {
+        /** ID. */
+        @QuerySqlField
+        private final int id;
+
+        /** First name. */
+        @QuerySqlField
+        private final String name;
+
+        /** Last name. */
+        @QuerySqlField
+        private final int age;
+
+        /**
+         * @param id ID.
+         * @param name Name.
+         * @param age Age.
+         */
+        Person(int id, String name, int age) {
+            assert !F.isEmpty(name);
+            assert age > 0;
+
+            this.id = id;
+            this.name = name;
+            this.age = age;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("SimplifiableIfStatement")
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            Person person = (Person) o;
+
+            if (id != person.id)
+                return false;
+
+            if (name == null ^ person.name == null)
+                return false;
+
+            if (name != null && !name.equals(person.name))
+                return false;
+
+            return age == person.age;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = id;
+
+            result = 31 * result + (name != null ? name.hashCode() : 0);
+            result = 31 * result + age;
+
+            return result;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
index 7ea22d5..8f1285b 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinConnectionSelfTest.java
@@ -237,6 +237,42 @@ public class JdbcThinConnectionSelfTest extends JdbcThinAbstractSelfTest
{
     }
 
     /**
+     * Test autoCloseServerCursor property handling.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAutoCloseServerCursorProperty() throws Exception {
+        String url = "jdbc:ignite:thin://127.0.0.1?" + JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR;
+
+        String err = "Failed to parse boolean property [name=" + JdbcThinUtils.PARAM_AUTO_CLOSE_SERVER_CURSOR;
+
+        assertInvalid(url + "=0", err);
+        assertInvalid(url + "=1", err);
+        assertInvalid(url + "=false1", err);
+        assertInvalid(url + "=true1", err);
+
+        try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1"))
{
+            assertFalse(io(conn).autoCloseServerCursor());
+        }
+
+        try (Connection conn = DriverManager.getConnection(url + "=true")) {
+            assertTrue(io(conn).autoCloseServerCursor());
+        }
+
+        try (Connection conn = DriverManager.getConnection(url + "=True")) {
+            assertTrue(io(conn).autoCloseServerCursor());
+        }
+
+        try (Connection conn = DriverManager.getConnection(url + "=false")) {
+            assertFalse(io(conn).autoCloseServerCursor());
+        }
+
+        try (Connection conn = DriverManager.getConnection(url + "=False")) {
+            assertFalse(io(conn).autoCloseServerCursor());
+        }
+    }
+
+    /**
      * Get client socket for connection.
      *
      * @param conn Connection.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index b372085..14c34ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -44,6 +44,7 @@ import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
 import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
 
+import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_AUTO_CLOSE_SERVER_CURSORS;
 import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_HOST;
 import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_PORT;
 import static org.apache.ignite.internal.jdbc.thin.JdbcThinUtils.PROP_DISTRIBUTED_JOINS;
@@ -106,6 +107,7 @@ public class JdbcThinConnection implements Connection {
         boolean enforceJoinOrder = extractBoolean(props, PROP_ENFORCE_JOIN_ORDER, false);
         boolean collocated = extractBoolean(props, PROP_COLLOCATED, false);
         boolean replicatedOnly = extractBoolean(props, PROP_REPLICATED_ONLY, false);
+        boolean autoCloseServerCursor = extractBoolean(props, PROP_AUTO_CLOSE_SERVER_CURSORS,
false);
 
         int sockSndBuf = extractIntNonNegative(props, PROP_SOCK_SND_BUF, 0);
         int sockRcvBuf = extractIntNonNegative(props, PROP_SOCK_RCV_BUF, 0);
@@ -114,7 +116,7 @@ public class JdbcThinConnection implements Connection {
 
         try {
             cliIo = new JdbcThinTcpIo(host, port, distributedJoins, enforceJoinOrder, collocated,
replicatedOnly,
-                sockSndBuf, sockRcvBuf, tcpNoDelay);
+                autoCloseServerCursor, sockSndBuf, sockRcvBuf, tcpNoDelay);
 
             cliIo.start();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
index 87bc526..5c61e23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinResultSet.java
@@ -94,6 +94,9 @@ public class JdbcThinResultSet implements ResultSet {
     /** Is query flag. */
     private boolean isQuery;
 
+    /** Auto close server cursors flag. */
+    private boolean autoClose;
+
     /** Update count. */
     private long updCnt;
 
@@ -105,11 +108,13 @@ public class JdbcThinResultSet implements ResultSet {
      * @param fetchSize Fetch size.
      * @param finished Finished flag.
      * @param rows Rows.
-     * @param isQuery Is Result ser for Select query
+     * @param isQuery Is Result ser for Select query.
+     * @param autoClose Is automatic close of server cursors enabled.
+     * @param updCnt Update count.
      */
     @SuppressWarnings("OverlyStrongTypeCast")
     JdbcThinResultSet(JdbcThinStatement stmt, long qryId, int fetchSize, boolean finished,
-        List<List<Object>> rows, boolean isQuery, long updCnt) {
+        List<List<Object>> rows, boolean isQuery, boolean autoClose, long updCnt)
{
         assert stmt != null;
         assert fetchSize > 0;
 
@@ -118,6 +123,7 @@ public class JdbcThinResultSet implements ResultSet {
         this.fetchSize = fetchSize;
         this.finished = finished;
         this.isQuery = isQuery;
+        this.autoClose = autoClose;
 
         if (isQuery) {
             this.fetchSize = fetchSize;
@@ -134,7 +140,7 @@ public class JdbcThinResultSet implements ResultSet {
     @Override public boolean next() throws SQLException {
         ensureNotClosed();
 
-        if (rowsIter == null && !finished) {
+        if ((rowsIter == null || !rowsIter.hasNext()) && !finished) {
             try {
                 JdbcQueryFetchResult res = stmt.connection().io().queryFetch(qryId, fetchSize);
 
@@ -178,7 +184,8 @@ public class JdbcThinResultSet implements ResultSet {
             return;
 
         try {
-            stmt.connection().io().queryClose(qryId);
+            if (!finished || (isQuery && !autoClose))
+                stmt.connection().io().queryClose(qryId);
 
             closed = true;
         }
@@ -1616,6 +1623,9 @@ public class JdbcThinResultSet implements ResultSet {
      * @throws SQLException On error.
      */
     private List<JdbcColumnMeta> meta() throws SQLException {
+        if (finished && (!isQuery || autoClose))
+            throw new SQLException("Server cursor is already closed.");
+
         if (!metaInit) {
             try {
                 JdbcQueryMetadataResult res = stmt.connection().io().queryMeta(qryId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index a0b7ee6..2cad223 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -112,7 +112,7 @@ public class JdbcThinStatement implements Statement {
             assert res != null;
 
             rs = new JdbcThinResultSet(this, res.getQueryId(), pageSize, res.last(), res.items(),
-                res.isQuery(), res.updateCount());
+                res.isQuery(), conn.io().autoCloseServerCursor(), res.updateCount());
         }
         catch (IOException e) {
             conn.close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 1905ea4..be62a8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -53,7 +53,7 @@ public class JdbcThinTcpIo {
     private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2,
1, 0);
 
     /** Initial output stream capacity for handshake. */
-    private static final int HANDSHAKE_MSG_SIZE = 12;
+    private static final int HANDSHAKE_MSG_SIZE = 13;
 
     /** Initial output for query message. */
     private static final int QUERY_EXEC_MSG_INIT_CAP = 256;
@@ -85,6 +85,9 @@ public class JdbcThinTcpIo {
     /** Replicated only flag. */
     private final boolean replicatedOnly;
 
+    /** Flag to automatically close server cursor. */
+    private final boolean autoCloseServerCursor;
+
     /** Socket send buffer. */
     private final int sockSndBuf;
 
@@ -115,18 +118,20 @@ public class JdbcThinTcpIo {
      * @param enforceJoinOrder Enforce join order flag.
      * @param collocated Collocated flag.
      * @param replicatedOnly Replicated only flag.
+     * @param autoCloseServerCursor Flag to automatically close server cursors.
      * @param sockSndBuf Socket send buffer.
      * @param sockRcvBuf Socket receive buffer.
      * @param tcpNoDelay TCP no delay flag.
      */
     JdbcThinTcpIo(String host, int port, boolean distributedJoins, boolean enforceJoinOrder,
boolean collocated,
-        boolean replicatedOnly, int sockSndBuf, int sockRcvBuf, boolean tcpNoDelay) {
+        boolean replicatedOnly, boolean autoCloseServerCursor, int sockSndBuf, int sockRcvBuf,
boolean tcpNoDelay) {
         this.host = host;
         this.port = port;
         this.distributedJoins = distributedJoins;
         this.enforceJoinOrder = enforceJoinOrder;
         this.collocated = collocated;
         this.replicatedOnly = replicatedOnly;
+        this.autoCloseServerCursor = autoCloseServerCursor;
         this.sockSndBuf = sockSndBuf;
         this.sockRcvBuf = sockRcvBuf;
         this.tcpNoDelay = tcpNoDelay;
@@ -182,6 +187,7 @@ public class JdbcThinTcpIo {
         writer.writeBoolean(enforceJoinOrder);
         writer.writeBoolean(collocated);
         writer.writeBoolean(replicatedOnly);
+        writer.writeBoolean(autoCloseServerCursor);
 
         send(writer.array());
 
@@ -382,6 +388,13 @@ public class JdbcThinTcpIo {
     }
 
     /**
+     * @return Auto close server cursors flag.
+     */
+    public boolean autoCloseServerCursor() {
+        return autoCloseServerCursor;
+    }
+
+    /**
      * @return Socket send buffer size.
      */
     public int socketSendBuffer() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
index aa9b011..78e8c8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.jdbc.thin;
 
-import org.apache.ignite.configuration.OdbcConfiguration;
 import org.apache.ignite.configuration.SqlConnectorConfiguration;
 
 import java.sql.Time;
@@ -76,6 +75,9 @@ public class JdbcThinUtils {
     /** Parameter: TCP no-delay flag. */
     public static final String PARAM_TCP_NO_DELAY = "tcpNoDelay";
 
+    /** Parameter: Automatically close server cursor. */
+    public static final String PARAM_AUTO_CLOSE_SERVER_CURSOR = "autoCloseServerCursor";
+
     /** Distributed joins property name. */
     public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
 
@@ -97,6 +99,9 @@ public class JdbcThinUtils {
     /** TCP no delay property name. */
     public static final String PROP_TCP_NO_DELAY = PROP_PREFIX + PARAM_TCP_NO_DELAY;
 
+    /** Automatically close server cursor. */
+    public static final String PROP_AUTO_CLOSE_SERVER_CURSORS = PROP_PREFIX + PARAM_AUTO_CLOSE_SERVER_CURSOR;
+
     /** Default port. */
     public static final int DFLT_PORT = SqlConnectorConfiguration.DFLT_PORT;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
index bff5519..6bb4e29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
@@ -252,9 +252,10 @@ public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]>
             boolean enforceJoinOrder = reader.readBoolean();
             boolean collocated = reader.readBoolean();
             boolean replicatedOnly = reader.readBoolean();
+            boolean autoCloseCursors = reader.readBoolean();
 
             SqlListenerRequestHandler handler = new JdbcRequestHandler(ctx, busyLock, maxCursors,
distributedJoins,
-                enforceJoinOrder, collocated, replicatedOnly);
+                enforceJoinOrder, collocated, replicatedOnly, autoCloseCursors);
 
             SqlListenerMessageParser parser = new JdbcMessageParser(ctx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
index b8edb8d..830daea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcQueryCursor.java
@@ -65,13 +65,16 @@ class JdbcQueryCursor {
      * @return List of the rows.
      */
     List<List<Object>> fetchRows() {
-        List<List<Object>> items = new ArrayList<>();
+        int fetchSize = (maxRows > 0) ? (int)Math.min(pageSize, maxRows - fetched) : pageSize;
 
-        int fetchSize0 = (maxRows > 0) ? (int)Math.min(pageSize, maxRows - fetched) :
pageSize;
+        List<List<Object>> items = new ArrayList<>(fetchSize);
 
-        for (; fetched < fetchSize0 && iter.hasNext(); ++fetched)
+        for (int i = 0; i < fetchSize && iter.hasNext(); i++) {
             items.add(iter.next());
 
+            fetched++;
+        }
+
         return items;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ecba1acc/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 0796cfc..94ac433 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -74,6 +74,9 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
     /** Replicated only flag. */
     private final boolean replicatedOnly;
 
+    /** Automatic close of cursors. */
+    private final boolean autoCloseCursors;
+
     /**
      * Constructor.
      *
@@ -84,9 +87,11 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
      * @param enforceJoinOrder Enforce join order flag.
      * @param collocated Collocated flag.
      * @param replicatedOnly Replicated only flag.
+     * @param autoCloseCursors Flag to automatically close server cursors.
      */
     public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
-        boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly)
{
+        boolean distributedJoins, boolean enforceJoinOrder, boolean collocated, boolean replicatedOnly,

+        boolean autoCloseCursors) {
         this.ctx = ctx;
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
@@ -94,6 +99,7 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
         this.enforceJoinOrder = enforceJoinOrder;
         this.collocated = collocated;
         this.replicatedOnly = replicatedOnly;
+        this.autoCloseCursors = autoCloseCursors;
 
         log = ctx.log(getClass());
     }
@@ -183,8 +189,6 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler {
 
             JdbcQueryCursor cur = new JdbcQueryCursor(qryId, req.pageSize(), req.maxRows(),
(QueryCursorImpl)qryCur);
 
-            qryCursors.put(qryId, cur);
-
             JdbcQueryExecuteResult res;
 
             if (cur.isQuery())
@@ -200,6 +204,11 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler
{
                 res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0));
             }
 
+            if (res.last() && (!res.isQuery() || autoCloseCursors))
+                cur.close();
+            else
+                qryCursors.put(qryId, cur);
+
             return new JdbcResponse(res);
         }
         catch (Exception e) {
@@ -260,6 +269,12 @@ public class JdbcRequestHandler implements SqlListenerRequestHandler
{
 
             JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext());
 
+            if (res.last() && (!cur.isQuery() || autoCloseCursors)) {
+                qryCursors.remove(req.queryId());
+
+                cur.close();
+            }
+
             return new JdbcResponse(res);
         }
         catch (Exception e) {


Mime
View raw message