From commits-return-125612-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Thu Dec 19 18:19:10 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id EC90C18037A for ; Thu, 19 Dec 2019 19:19:09 +0100 (CET) Received: (qmail 10405 invoked by uid 500); 19 Dec 2019 18:19:09 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 10396 invoked by uid 99); 19 Dec 2019 18:19:09 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Dec 2019 18:19:09 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2A59F8D80D; Thu, 19 Dec 2019 18:19:09 +0000 (UTC) Date: Thu, 19 Dec 2019 18:19:08 +0000 To: "commits@ignite.apache.org" Subject: [ignite] branch master updated: IGNITE- 12461 Failover connections support for JDBC thin driver added. This closes #7154. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157677954846.18522.2862095019942569006@gitbox.apache.org> From: amashenkov@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: ignite X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 564d5dbc139c8a78d28b76cd7e2d211ab6f0f929 X-Git-Newrev: a4b722569a98e5ac24dddc1caea9a53b18e230b2 X-Git-Rev: a4b722569a98e5ac24dddc1caea9a53b18e230b2 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git The following commit(s) were added to refs/heads/master by this push: new a4b7225 IGNITE- 12461 Failover connections support for JDBC thin driver added. This closes #7154. a4b7225 is described below commit a4b722569a98e5ac24dddc1caea9a53b18e230b2 Author: alapin AuthorDate: Wed Jun 5 16:24:42 2019 +0300 IGNITE- 12461 Failover connections support for JDBC thin driver added. This closes #7154. --- ...teJdbcThinDriverAffinityAwarenessTestSuite.java | 4 +- ...tyAwarenessReconnectionAndFailoverSelfTest.java | 985 +++++++++++++++++++++ ...cThinAffinityAwarenessReconnectionSelfTest.java | 398 --------- .../internal/jdbc/thin/JdbcThinConnection.java | 182 ++-- .../internal/processors/odbc/jdbc/JdbcRequest.java | 18 +- 5 files changed, 1124 insertions(+), 463 deletions(-) diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java index 3937fe2..5a2255c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcThinDriverAffinityAwarenessTestSuite.java @@ -18,7 +18,7 @@ package org.apache.ignite.jdbc.suite; import org.apache.ignite.jdbc.thin.JdbcThinAbstractSelfTest; -import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionSelfTest; +import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinAffinityAwarenessTransactionsSelfTest; import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest; @@ -39,7 +39,7 @@ import org.junit.runners.Suite; JdbcThinStatementSelfTest.class, JdbcThinAffinityAwarenessSelfTest.class, JdbcThinAffinityAwarenessTransactionsSelfTest.class, - JdbcThinAffinityAwarenessReconnectionSelfTest.class, + JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest.class, }) public class IgniteJdbcThinDriverAffinityAwarenessTestSuite { /** diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest.java new file mode 100644 index 0000000..8eefa2e24 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest.java @@ -0,0 +1,985 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.jdbc.thin.AffinityCache; +import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; +import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo; +import org.apache.ignite.internal.jdbc.thin.QualifiedSQLQuery; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; +import org.apache.ignite.internal.sql.optimizer.affinity.PartitionSingleNode; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Test; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Jdbc thin affinity awareness reconnection and query failover test. + */ +public class JdbcThinAffinityAwarenessReconnectionAndFailoverSelfTest extends JdbcThinAbstractSelfTest { + /** Rows count. */ + private static final int ROWS_COUNT = 100; + + /** URL. */ + private static final String URL = "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true"; + + /** URL with port. */ + public static final String URL_WITH_ONE_PORT = "jdbc:ignite:thin://127.0.0.1:10800?affinityAwareness=true"; + + /** Nodes count. */ + private static final int INITIAL_NODES_CNT = 3; + + /** Log handler. */ + private static LogHandler logHnd; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(INITIAL_NODES_CNT); + + Logger log = Logger.getLogger(JdbcThinConnection.class.getName()); + logHnd = new LogHandler(); + logHnd.setLevel(Level.ALL); + log.setUseParentHandlers(false); + log.addHandler(logHnd); + log.setLevel(Level.ALL); + } + + /** + * Check that background connection establishment works as expected. + *

+ * Within new reconnection logic in affinity awareness mode when {@code JdbcThinConnection} is created it eagerly + * establishes a connection to one and only one ignite node. All other connections to nodes specified in connection + * properties are established by background thread. + *

+ * So in given test we specify url with 3 different ports and verify that 3 connections will be created: one in + * eager mode and two within background thread. It takes some time for background thread to create a connection, and + * cause, in addition to that it runs periodically with some delay, {@code GridTestUtils.waitForCondition} is used + * in order to check that all expected connections are established. + * + * @throws Exception If failed. + */ + @Test + public void testBackgroundConnectionEstablishment() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, 3); + } + } + + /** + * Test connection failover: + *

    + *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. + *
  3. Stop one node, invalidate dead connection (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count has decremented.
  4. + *
  5. Start, previously stopped node, and check that connections count also restored to initial value.
  6. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testConnectionFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + assertEquals("Unexpected connections count.", INITIAL_NODES_CNT, ios.size()); + + stopGrid(1); + + invalidateConnectionToStoppedNode(conn); + + assertEquals("Unexpected connections count.", INITIAL_NODES_CNT - 1, ios.size()); + + startGrid(1); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Test total connection failover: + *
    + *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. + *
  3. Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count equals to zero.
  4. + *
  5. Start, previously stopped nodes, and check that connections count also restored to initial value.
  6. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testTotalConnectionFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) { + stopGrid(i); + invalidateConnectionToStoppedNode(conn); + } + + assertConnectionsCount(ios, 0); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) + startGrid(i); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Test eager connection failover: + *
    + *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. + *
  3. Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, + * until it tries to touch it) and verify, that connections count equals to zero.
  4. + *
  5. Wait for some time, in order for reconnection thread to increase delay between connection attempts, + * because of reconnection failures.
  6. + *
  7. Start, previously stopped nodes, and send simple query immediately. Eager reconnection is expected. + * NOTE:There's still a chance that connection would be recreated by background thread and not eager + * process. In order to decrease given possibility we've waited for some time on previous step.
  8. + *
  9. Ensure that after some time all connections will be restored.
  10. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testEagerConnectionFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) { + stopGrid(i); + invalidateConnectionToStoppedNode(conn); + } + + assertEquals("Unexpected connections count.", 0, ios.size()); + + doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY); + + for (int i = 0; i < INITIAL_NODES_CNT; i++) + startGrid(i); + + conn.createStatement().execute("select 1;"); + + assertConnectionsCount(ios, INITIAL_NODES_CNT); + } + } + + /** + * Check that reconnection thread increases delay between unsuccessful connection attempts: + *
    + *
  1. Specify two inet addresses one valid and one inoperative.
  2. + *
  3. Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection + * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds or + * until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} + *
    +     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
    +     *   where: '|' is connection attempt;
    +     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
    +     *
    +     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
    +     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
    +     *   
    + *
  4. + *
  5. Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see + * four warning messages there.
  6. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testReconnectionDelayIncreasing() throws Exception { + try (Connection ignored = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) { + logHnd.records.clear(); + + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, logHnd.records.size()); + + String expRecordMsg = "Failed to connect to Ignite node " + + "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810]. address = [localhost/127.0.0.1:10810]."; + + for (LogRecord record : logHnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + } + } + + /** + * Check that reconnection thread selectively increases delay between unsuccessful connection attempts: + *
    + *
  1. Create {@code JdbcThinConnection} with two valid inet addresses.
  2. + *
  3. Stop one node and invalidate corresponding connection. Ensure that only one connection left.
  4. + *
  5. Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection + * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds or + * until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} + *
    +     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
    +     *   where: '|' is connection attempt;
    +     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
    +     *
    +     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
    +     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
    +     *   
    + *
  6. + *
  7. Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see + * four warning messages there.
  8. + *
  9. Start previously stopped node.
  10. + *
  11. Wait until next reconnection attempt.
  12. + *
  13. Check that both connections are established and that there are no warning messages within logs.
  14. + *
  15. One more time: stop one node and invalidate corresponding connection. + * Ensure that only one connection left.
  16. + *
  17. Wait for some time.
  18. + *
  19. Ensure that delay between reconnection was reset to initial value. + * In other words, we again expect four warning messages within logs.
  20. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testReconnectionDelaySelectiveIncreasing() throws Exception { + try (Connection conn = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) { + // Stop one node and invalidate corresponding connection. Ensure that only one connection left. + stopGrid(0); + + invalidateConnectionToStoppedNode(conn); + + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertEquals("Unexpected connections count.", 1, ios.size()); + + logHnd.records.clear(); + + // Wait for some specific amount of time and ensure that there were exact four reconnection attempts. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, logHnd.records.size()); + + String expRecordMsg = "Failed to connect to Ignite node [url=jdbc:ignite:thin://127.0.0.1:10800..10801]." + + " address = [localhost/127.0.0.1:10800]."; + + for (LogRecord record : logHnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + + // Start previously stopped node. + startGrid(0); + + logHnd.records.clear(); + + // Waiting until next reconnection attempt. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + // Checking that both connections are established and that there are no warning messages within logs. + assertEquals("Unexpected log records count.", 0, logHnd.records.size()); + + assertEquals("Unexpected connections count.", 2, ios.size()); + + // One more time: stop one node, invalidate corresponding connection and ensure that only one connection + // left. + stopGrid(0); + + invalidateConnectionToStoppedNode(conn); + + assertEquals("Unexpected connections count.", 1, ios.size()); + + logHnd.records.clear(); + + // Wait for some time and ensure that delay between reconnection was reset to initial value. + doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); + + assertEquals("Unexpected log records count.", 4, logHnd.records.size()); + + for (LogRecord record : logHnd.records) { + assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); + assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); + } + + startGrid(0); + } + } + + /** + * Check that failover doesn't occur if the result of sending sql request is SQLException. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableNotThrown") + @Test + public void testSQLExceptionFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL)) { + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + conn.createStatement().execute("select invalid column name."); + + return null; + } + }, + SQLException.class, + "Failed to parse query." + ); + } + + assertEquals("Unexpected log records count.", 1, logHnd.records.size()); + + LogRecord record = logHnd.records.get(0); + + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + /** + * Check that failover occurs if the result of sending first iteration of sql request is an Exception but not an + * SQLException. + * + *
    + *
  1. Create {@code JdbcThinConnection} to all existing nodes.
  2. + *
  3. Create a cache and populate it with some data.
  4. + *
  5. Submit some failover-applicable sql query with specific condition within where clause, + * that assumes affinity awareness. Submit same query one more time. It's necessary in order to warm up affinity + * awareness cache.
  6. + *
  7. From within affinity cache calculate node that was used to process query. Stop it.
  8. + *
  9. Submit sql query, that is equal to initial one, one more time. + * Because of affinity awareness, given query will be routed to previously stopped node, so an Exception will be + * received. Here query failover goes and resents query to an alive node using another {@code JdbcThinTcpIo}
  10. + *
  11. Because of failover, no exceptions are expected on Jdbc thin client side. + * However within the {@code JdbcThinConnection}, in case of {@code Level.FINE} log level, corresponding log record + * is expected.
  12. + *
+ * + * @throws Exception If failed. + */ + @Test + public void testQueryFailover() throws Exception { + try (Connection conn = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true")) { + + final String cacheName = UUID.randomUUID().toString().substring(0, 6); + + final String sql = "select * from \"" + cacheName + "\".Person where _key = 1"; + + CacheConfiguration cache = prepareCacheConfig(cacheName); + + ignite(0).createCache(cache); + + fillCache(cacheName); + + Statement stmt = conn.createStatement(); + + stmt.execute(sql); + stmt.execute(sql); + + AffinityCache affinityCache = GridTestUtils.getFieldValue(conn, "affinityCache"); + + Integer part = ((PartitionSingleNode)affinityCache.partitionResult( + new QualifiedSQLQuery("PUBLIC", sql)).partitionResult().tree()).value(); + + UUID nodeId = affinityCache.cacheDistribution(GridCacheUtils.cacheId(cacheName))[part]; + + int gridIdx = new Integer(Ignition.ignite(nodeId).name().substring(getTestIgniteInstanceName().length())); + stopGrid(gridIdx); + + logHnd.records.clear(); + + conn.createStatement().execute(sql); + + startGrid(gridIdx); + } + + assertEquals("Unexpected log records count.", 1, logHnd.records.size()); + + LogRecord record = logHnd.records.get(0); + + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + /** + * Check that all possible sub-connections are used. + * + *
    + *
  1. Create {@code JdbcThinConnection} to all existing nodes.
  2. + *
  3. Stop all nodes.
  4. + *
  5. Submit arbitrary sql query.
  6. + *
  7. Several retries are expected. Exact number of retries should be equal to the number of originally + * established connections. At the very end, after trying to establish brand new connections {@code SQLException} + * with message: 'Failed to connect to server' should be thrown.
  8. + *
+ * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableNotThrown") + @Test + public void testFailoverOnAllNodes() throws Exception { + try (Connection conn = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true")) { + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, 3); + + stopAllGrids(); + + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + conn.createStatement().execute("select 1"); + + return null; + } + }, + SQLException.class, + "Failed to connect to server [url=jdbc:ignite:thin://127.0.0.1:10800..10802]" + ); + } + + assertEquals("Unexpected log records count.", 3, logHnd.records.size()); + + for (LogRecord record : logHnd.records) { + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + startGridsMultiThreaded(INITIAL_NODES_CNT); + } + + /** + * Check that there won't be more than 5 retry attempts. + * + * @throws Exception If failed. + */ + @SuppressWarnings("ThrowableNotThrown") + @Test + public void testFailoverLimit() throws Exception { + startGrid(3); + startGrid(4); + startGrid(5); + + try (Connection conn = DriverManager.getConnection( + "jdbc:ignite:thin://127.0.0.1:10800..10805?affinityAwareness=true")) { + + Map ios = GridTestUtils.getFieldValue(conn, "ios"); + + assertConnectionsCount(ios, 6); + + stopAllGrids(); + + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + conn.createStatement().execute("select 1"); + + return null; + } + }, + SQLException.class, + "Failed to communicate with Ignite cluster." + ); + + assertEquals("Unexpected connections count.", 1, ios.keySet().size()); + } + + assertEquals("Unexpected log records count.", 5, logHnd.records.size()); + + for (LogRecord record : logHnd.records) { + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + startGridsMultiThreaded(INITIAL_NODES_CNT); + } + + /** + * Check that there are no retries in case of transactional query. + * + * @throws Exception If failed. + */ + @SuppressWarnings({"unchecked", "ThrowableNotThrown"}) + @Test + public void testTransactionalQueryFailover() throws Exception { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + final String cacheName = UUID.randomUUID().toString().substring(0, 6); + + final String sql = "select 1 from \"" + cacheName + "\".Person"; + + CacheConfiguration cache = defaultCacheConfiguration().setName(cacheName). + setNearConfiguration(null).setIndexedTypes(Integer.class, Person.class). + setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); + + ignite(0).createCache(cache); + + Statement stmt = conn.createStatement(); + + stmt.execute("BEGIN"); + + stmt.execute(sql); + + stopGrid(0); + + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + stmt.execute(sql); + + return null; + } + }, + SQLException.class, + "Failed to communicate with Ignite cluster." + ); + } + + assertEquals("Unexpected log records count.", 1, logHnd.records.size()); + + LogRecord record = logHnd.records.get(0); + + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + + startGrid(0); + } + + /** + * Check that there are no retries in following cases: + *
    + *
  • Result set's metadata request.
  • + *
  • Multi-statements request.
  • + *
  • DDL.
  • + *
  • DML.
  • + *
+ * + * @throws Exception If failed. + */ + @Test + public void testNoRetriesOccurred() throws Exception { + // Check that there are no retries in case of result set's metadata request. + checkNoRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + Statement stmt = conn.createStatement(); + + ResultSet rs = stmt.executeQuery("select 1"); + + stopGrid(0); + + rs.getMetaData(); + } + return null; + }); + + startGrid(0); + + // Check that there are no retries in case of multi-statements request. + checkNoRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + Statement stmt = conn.createStatement(); + + stopGrid(0); + + stmt.executeQuery("select 1; select 2"); + } + return null; + }); + + startGrid(0); + + // Check that there are no retries in case of DDL. + checkNoRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + Statement stmt = conn.createStatement(); + + stopGrid(0); + + stmt.execute("CREATE TABLE PARENT" + UUID.randomUUID().toString().substring(0, 6) + + " (ID INT, NAME VARCHAR, PRIMARY KEY(ID));"); + } + return null; + }); + + startGrid(0); + + // Check that there are no retries in case of DML. + checkNoRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + Statement stmt = conn.createStatement(); + + String tblName = "PARENT" + UUID.randomUUID().toString().substring(0, 6); + + stmt.execute("CREATE TABLE " + tblName + + " (ID INT, NAME VARCHAR, PRIMARY KEY(ID));"); + + stopGrid(0); + + stmt.execute("INSERT INTO" + tblName + " (ID, NAME) VALUES(1, 'aaa')"); + } + return null; + }); + + startGrid(0); + } + + /** + * Check that there are retries in case of following metadata requests: + *
    + *
  • META_TABLES
  • + *
  • META_COLUMNS
  • + *
  • META_INDEXES
  • + *
  • META_PARAMS
  • + *
  • META_PRIMARY_KEYS
  • + *
  • META_SCHEMAS
  • + *
+ * + * @throws Exception If failed. + */ + @Test + public void testMetadataQueries() throws Exception { + // Test META_TABLES query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + conn.getMetaData().getTables(null, null, null, null); + } + + return null; + }); + + startGrid(0); + + // Test META_COLUMNS query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + conn.getMetaData().getColumns(null, null, null, + null); + } + + return null; + }); + + startGrid(0); + + // Test META_INDEXES query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + conn.getMetaData().getIndexInfo(null, null, null, false, false); + } + + return null; + }); + + startGrid(0); + + // Test META_PARAMS query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + PreparedStatement preparedStmt = conn.prepareStatement("select 1"); + + preparedStmt.getParameterMetaData(); + } + + return null; + }); + + startGrid(0); + + // Test META_PRIMARY_KEYS query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + conn.getMetaData().getPrimaryKeys(null, null, null); + } + + return null; + }); + + startGrid(0); + + // Test META_SCHEMAS query. + checkRetriesOccurred(() -> { + try (Connection conn = DriverManager.getConnection(URL_WITH_ONE_PORT)) { + stopGrid(0); + + conn.getMetaData().getSchemas(null, null); + } + + return null; + }); + + startGrid(0); + } + + /** + * Helper method in order to check that retries do have occurred in case of running {@param queriesToTest} + * statements. + * + * @param queriesToTest Statements to test. + */ + @SuppressWarnings("ThrowableNotThrown") + private void checkRetriesOccurred(Callable queriesToTest) { + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + queriesToTest.call(); + + return null; + } + }, + SQLException.class, + "Failed to connect to server [host=localhost, port=10800]" + ); + + assertEquals("Unexpected log records count.", 1, logHnd.records.size()); + + LogRecord record = logHnd.records.get(0); + + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + /** + * Helper method in order to check that no retries have occurred in case of running {@param queriesToTest} + * statements. + * + * @param queriesToTest Statements to test. + */ + @SuppressWarnings("ThrowableNotThrown") + private void checkNoRetriesOccurred(Callable queriesToTest) { + logHnd.records.clear(); + + GridTestUtils.assertThrows(null, + new Callable() { + @Override public Object call() throws Exception { + queriesToTest.call(); + + return null; + } + }, + SQLException.class, + "Failed to communicate with Ignite cluster." + ); + + assertEquals("Unexpected log records count.", 1, logHnd.records.size()); + + LogRecord record = logHnd.records.get(0); + + assertEquals("Unexpected log record text.", "Exception during sending an sql request.", + record.getMessage()); + + assertEquals("Unexpected log level", Level.FINE, record.getLevel()); + } + + /** + * Assert connections count. + * + * @param ios Map that holds connections. + * @param expConnCnt Expected connections count. + */ + private void assertConnectionsCount(Map ios, int expConnCnt) + throws IgniteInterruptedCheckedException { + boolean allConnectionsEstablished = GridTestUtils.waitForCondition(() -> ios.size() == expConnCnt, + 10_000); + + assertTrue("Unexpected connections count.", allConnectionsEstablished); + } + + /** + * Invalidate connection to stopped node. Jdbc thin, won't detect that node has gone, until it tries to touch it. So + * sending simple query to randomly chosen connection(socket), sooner or later, will touch dead one, and thus + * invalidate it. Please, pay attention, that it's better to send non-failoverable query, for example query with + * ';' somewhere in the middle. + * + * @param conn Connections. + */ + private void invalidateConnectionToStoppedNode(Connection conn) { + while (true) { + try (Statement stmt = conn.createStatement()) { + stmt.execute("select ';';"); + } + catch (SQLException e) { + return; + } + } + } + + /** + * Simple {@code java.util.logging.Handler} implementation in order to check log records generated by {@code + * JdbcThinConnection}. + */ + static class LogHandler extends Handler { + + /** Log records. */ + private final List records = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void publish(LogRecord record) { + records.add(record); + } + + /** {@inheritDoc} */ + @Override + public void close() { + } + + /** {@inheritDoc} */ + @Override + public void flush() { + } + + /** + * @return Records. + */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public List records() { + return records; + } + } + + /** + * Prepares default cache configuration with given name. + * + * @param cacheName Cache name. + * @return Cache configuration. + */ + @SuppressWarnings("unchecked") + protected CacheConfiguration prepareCacheConfig(String cacheName) { + CacheConfiguration cache = defaultCacheConfiguration(); + + cache.setName(cacheName); + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setIndexedTypes( + Integer.class, Person.class + ); + + return cache; + } + + /** + * Fills cache with test data. + * + * @param cacheName Cache name. + */ + private void fillCache(String cacheName) { + IgniteCache cachePerson = grid(0).cache(cacheName); + + assert cachePerson != null; + + for (int i = 0; i < ROWS_COUNT; i++) + cachePerson.put(i, new Person(i, "John" + i, "White" + i, i + 1)); + } + + /** + * Person. + */ + @SuppressWarnings("unused") + private static class Person implements Serializable { + /** ID. */ + @QuerySqlField + private final int id; + + /** First name. */ + @QuerySqlField + private final String firstName; + + /** Last name. */ + @QuerySqlField + private final String lastName; + + /** Age. */ + @QuerySqlField + private final int age; + + /** + * @param id ID. + * @param firstName First name. + * @param lastName Last name. + * @param age Age. + */ + private Person(int id, String firstName, String lastName, int age) { + assert !F.isEmpty(firstName); + assert !F.isEmpty(lastName); + assert age > 0; + + this.id = id; + this.firstName = firstName; + this.lastName = lastName; + this.age = age; + } + } +} diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java deleted file mode 100644 index 3a3cde9..0000000 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinAffinityAwarenessReconnectionSelfTest.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.jdbc.thin; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; -import java.util.logging.Logger; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; -import org.apache.ignite.internal.jdbc.thin.JdbcThinConnection; -import org.apache.ignite.internal.jdbc.thin.JdbcThinTcpIo; -import org.apache.ignite.testframework.GridTestUtils; -import org.junit.Test; - -/** - * Jdbc thin affinity awareness reconnection test. - */ -public class JdbcThinAffinityAwarenessReconnectionSelfTest extends JdbcThinAbstractSelfTest { - /** URL. */ - private static final String URL = "jdbc:ignite:thin://127.0.0.1:10800..10802?affinityAwareness=true"; - - /** Nodes count. */ - private static final int INITIAL_NODES_CNT = 3; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(INITIAL_NODES_CNT); - } - - /** - * Check that background connection establishment works as expected. - *

- * Within new reconnection logic in affinity awareness mode when {@code JdbcThinConnection} is created - * it eagerly establishes a connection to one and only one ignite node. All other connections to nodes specified in - * connection properties are established by background thread. - *

- * So in given test we specify url with 3 different ports and verify that 3 connections will be created: - * one in eager mode and two within background thread. It takes some time for background thread to create - * a connection, and cause, in addition to that it runs periodically with some delay, - * {@code GridTestUtils.waitForCondition} is used in order to check that all expected connections are established. - * - * @throws Exception If failed. - */ - @Test - public void testBackgroundConnectionEstablishment() throws Exception { - try (Connection conn = DriverManager.getConnection(URL)) { - Map ios = GridTestUtils.getFieldValue(conn, "ios"); - - assertConnectionsCount(ios, 3); - } - } - - /** - * Test connection failover: - *

    - *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. - *
  3. Stop one node, invalidate dead connection (jdbc thin, won't detect that node has gone, - * until it tries to touch it) and verify, that connections count has decremented.
  4. - *
  5. Start, previously stopped node, and check that connections count also restored to initial value.
  6. - *
- * - * @throws Exception If failed. - */ - @Test - public void testConnectionFailover() throws Exception { - try (Connection conn = DriverManager.getConnection(URL)) { - Map ios = GridTestUtils.getFieldValue(conn, "ios"); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - - assertEquals("Unexpected connections count.", INITIAL_NODES_CNT, ios.size()); - - stopGrid(1); - - invalidateConnectionToStoppedNode(conn); - - assertEquals("Unexpected connections count.", INITIAL_NODES_CNT - 1, ios.size()); - - startGrid(1); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - } - } - - /** - * Test total connection failover: - *
    - *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. - *
  3. Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, - * until it tries to touch it) and verify, that connections count equals to zero.
  4. - *
  5. Start, previously stopped nodes, and check that connections count also restored to initial value.
  6. - *
- * - * @throws Exception If failed. - */ - @Test - public void testTotalConnectionFailover() throws Exception { - try(Connection conn = DriverManager.getConnection(URL)) { - Map ios = GridTestUtils.getFieldValue(conn, "ios"); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - - for (int i = 0; i < INITIAL_NODES_CNT; i++) { - stopGrid(i); - invalidateConnectionToStoppedNode(conn); - } - - assertConnectionsCount(ios, 0); - - for (int i = 0; i < INITIAL_NODES_CNT; i++) - startGrid(i); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - } - } - - /** - * Test eager connection failover: - *
    - *
  1. Check that at the beginning of test {@code INITIAL_NODES_CNT} connections are established.
  2. - *
  3. Stop all nodes, invalidate dead connections (jdbc thin, won't detect that node has gone, - * until it tries to touch it) and verify, that connections count equals to zero.
  4. - *
  5. Wait for some time, in order for reconnection thread to increase delay between connection attempts, - * because of reconnection failures.
  6. - *
  7. Start, previously stopped nodes, and send simple query immediately. Eager reconnection is expected. - * NOTE:There's still a chance that connection would be recreated by background thread and not eager process. - * In order to decrease given possibility we've waited for some time on previous step.
  8. - *
  9. Ensure that after some time all connections will be restored.
  10. - *
- * - * @throws Exception If failed. - */ - @Test - public void testEagerConnectionFailover() throws Exception { - try(Connection conn = DriverManager.getConnection(URL)) { - Map ios = GridTestUtils.getFieldValue(conn, "ios"); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - - for (int i = 0; i < INITIAL_NODES_CNT; i++) { - stopGrid(i); - invalidateConnectionToStoppedNode(conn); - } - - assertEquals("Unexpected connections count.", 0, ios.size()); - - doSleep(4 * JdbcThinConnection.RECONNECTION_DELAY); - - for (int i = 0; i < INITIAL_NODES_CNT; i++) - startGrid(i); - - conn.createStatement().execute("select 1;"); - - assertConnectionsCount(ios, INITIAL_NODES_CNT); - } - } - - /** - * Check that reconnection thread increases delay between unsuccessful connection attempts: - *
    - *
  1. Specify two inet addresses one valid and one inoperative.
  2. - *
  3. Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection - * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds - * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} - *
    -     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
    -     *   where: '|' is connection attempt;
    -     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
    -     *
    -     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
    -     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
    -     *   
    - *
  4. - *
  5. Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see - * four warning messages there.
  6. - *
- * - * @throws Exception If failed. - */ - @Test - public void testReconnectionDelayIncreasing() throws Exception { - Logger log = Logger.getLogger(JdbcThinConnection.class.getName()); - LogHandler hnd = new LogHandler(); - hnd.setLevel(Level.ALL); - log.setUseParentHandlers(false); - log.addHandler(hnd); - log.setLevel(Level.ALL); - - try (Connection ignored = DriverManager.getConnection( - "jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810?affinityAwareness=true")) { - hnd.records.clear(); - - doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); - - assertEquals("Unexpected log records count.", 4, hnd.records.size()); - - String expRecordMsg = "Failed to connect to Ignite node " + - "[url=jdbc:ignite:thin://127.0.0.1:10800,127.0.0.1:10810]. address = [localhost/127.0.0.1:10810]."; - - for (LogRecord record: hnd.records) { - assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); - assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); - } - } - } - - /** - * Check that reconnection thread selectively increases delay between unsuccessful connection attempts: - *
    - *
  1. Create {@code JdbcThinConnection} with two valid inet addresses.
  2. - *
  3. Stop one node and invalidate corresponding connection. Ensure that only one connection left.
  4. - *
  5. Wait for specific amount of time. The reconnection logic suppose to increase delays between reconnection - * attempts. The basic idea is very simple: delay is doubled on evey connection failure until connection succeeds - * or until delay exceeds predefined maximum value {@code JdbcThinConnection.RECONNECTION_MAX_DELAY} - *
    -     *   |_|_ _|_ _ _ _|_ _ _ _ _ _ _ _|
    -     *   where: '|' is connection attempt;
    -     *          '_' is an amount of time that reconnection tread waits, equal to JdbcThinConnection.RECONNECTION_DELAY;
    -     *
    -     *   so if we wait for 9 * RECONNECTION_DELAY, we expect to see exact four connection attempts:
    -     *   |_|_ _|_ _ _ _|_ _^_ _ _ _ _ _|
    -     *   
    - *
  6. - *
  7. Check that there were exact four reconnection attempts. In order to do this, we check logs, expecting to see - * four warning messages there.
  8. - *
  9. Start previously stopped node.
  10. - *
  11. Wait until next reconnection attempt.
  12. - *
  13. Check that both connections are established and that there are no warning messages within logs.
  14. - *
  15. One more time: stop one node and invalidate corresponding connection. - * Ensure that only one connection left.
  16. - *
  17. Wait for some time.
  18. - *
  19. Ensure that delay between reconnection was reset to initial value. - * In other words, we again expect four warning messages within logs.
  20. - *
- * - * @throws Exception If failed. - */ - @Test - public void testReconnectionDelaySelectiveIncreasing() throws Exception { - Logger log = Logger.getLogger(JdbcThinConnection.class.getName()); - LogHandler hnd = new LogHandler(); - hnd.setLevel(Level.ALL); - log.setUseParentHandlers(false); - log.addHandler(hnd); - log.setLevel(Level.ALL); - - try (Connection conn = DriverManager.getConnection( - "jdbc:ignite:thin://127.0.0.1:10800..10801?affinityAwareness=true")) { - // Stop one node and invalidate corresponding connection. Ensure that only one connection left. - stopGrid(0); - - invalidateConnectionToStoppedNode(conn); - - Map ios = GridTestUtils.getFieldValue(conn, "ios"); - - assertEquals("Unexpected connections count.", 1, ios.size()); - - hnd.records.clear(); - - // Wait for some specific amount of time and ensure that there were exact four reconnection attempts. - doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); - - assertEquals("Unexpected log records count.", 4, hnd.records.size()); - - String expRecordMsg = "Failed to connect to Ignite node [url=jdbc:ignite:thin://127.0.0.1:10800..10801]." + - " address = [localhost/127.0.0.1:10800]."; - - for (LogRecord record: hnd.records) { - assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); - assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); - } - - // Start previously stopped node. - startGrid(0); - - hnd.records.clear(); - - // Waiting until next reconnection attempt. - doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); - - // Checking that both connections are established and that there are no warning messages within logs. - assertEquals("Unexpected log records count.", 0, hnd.records.size()); - - assertEquals("Unexpected connections count.", 2, ios.size()); - - // One more time: stop one node, invalidate corresponding connection and ensure that only one connection - // left. - stopGrid(0); - - invalidateConnectionToStoppedNode(conn); - - assertEquals("Unexpected connections count.", 1, ios.size()); - - hnd.records.clear(); - - // Wait for some time and ensure that delay between reconnection was reset to initial value. - doSleep(9 * JdbcThinConnection.RECONNECTION_DELAY); - - assertEquals("Unexpected log records count.", 4, hnd.records.size()); - - for (LogRecord record: hnd.records) { - assertEquals("Unexpected log record text.", expRecordMsg, record.getMessage()); - assertEquals("Unexpected log level", Level.WARNING, record.getLevel()); - } - - startGrid(0); - } - } - - /** - * Assert connections count. - * - * @param ios Map that holds connections. - * @param expConnCnt Expected connections count. - */ - private void assertConnectionsCount(Map ios, int expConnCnt) - throws IgniteInterruptedCheckedException { - boolean allConnectionsEstablished = GridTestUtils.waitForCondition(() -> ios.size() == expConnCnt, - 10_000); - - assertTrue("Unexpected connections count.", allConnectionsEstablished); - } - - /** - * Invalidate connection to stopped node. Jdbc thin, won't detect that node has gone, until it tries to touch it. - * So sending simple query to randomly chosen connection(socket), sooner or later, will touch dead one, - * and thus invalidate it. - * - * @param conn Connections. - */ - private void invalidateConnectionToStoppedNode(Connection conn) { - while (true) { - try (Statement stmt = conn.createStatement()) { - stmt.execute("select 1"); - } - catch (SQLException e) { - return; - } - } - } - - /** - * Simple {@code java.util.logging.Handler} implementation in order to check log records - * generated by {@code JdbcThinConnection}. - */ - static class LogHandler extends Handler { - - /** Log records. */ - private final List records = new ArrayList<>(); - - /** {@inheritDoc} */ - @Override public void publish(LogRecord record) { - records.add(record); - } - - /** {@inheritDoc} */ - @Override - public void close() { - } - - /** {@inheritDoc} */ - @Override - public void flush() { - } - - /** - * @return Records. - */ - @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") public List records() { - return records; - } - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java index 971acdf..dec052a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java @@ -99,6 +99,10 @@ import static java.sql.ResultSet.CLOSE_CURSORS_AT_COMMIT; import static java.sql.ResultSet.CONCUR_READ_ONLY; import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; import static java.sql.ResultSet.TYPE_FORWARD_ONLY; +import static org.apache.ignite.internal.processors.odbc.SqlStateCode.CLIENT_CONNECTION_FAILED; +import static org.apache.ignite.internal.processors.odbc.SqlStateCode.CONNECTION_CLOSED; +import static org.apache.ignite.internal.processors.odbc.SqlStateCode.CONNECTION_FAILURE; +import static org.apache.ignite.internal.processors.odbc.SqlStateCode.INTERNAL_ERROR; /** * JDBC connection implementation. @@ -127,6 +131,12 @@ public class JdbcThinConnection implements Connection { /** Index generator. */ private static final AtomicLong IDX_GEN = new AtomicLong(); + /** Default retires count. */ + public static final int DFLT_RETRIES_CNT = 4; + + /** No retries. */ + public static final int NO_RETRIES = 0; + /** Affinity awareness enabled flag. */ private final boolean affinityAwareness; @@ -277,7 +287,7 @@ public class JdbcThinConnection implements Connection { if (newVal) { if (!cmd0.isOrdered() && !cliIo.isUnorderedStreamSupported()) { throw new SQLException("Streaming without order doesn't supported by server [remoteNodeVer=" - + cliIo.igniteVersion() + ']', SqlStateCode.INTERNAL_ERROR); + + cliIo.igniteVersion() + ']', INTERNAL_ERROR); } streamState = new StreamState((SqlSetStreamingCommand)cmd, cliIo); @@ -826,7 +836,7 @@ public class JdbcThinConnection implements Connection { */ public void ensureNotClosed() throws SQLException { if (closed) - throw new SQLException("Connection is closed.", SqlStateCode.CONNECTION_CLOSED); + throw new SQLException("Connection is closed.", CONNECTION_CLOSED); } /** @@ -866,7 +876,6 @@ public class JdbcThinConnection implements Connection { */ JdbcResultWithIo sendRequest(JdbcRequest req, JdbcThinStatement stmt, @Nullable JdbcThinTcpIo stickyIo) throws SQLException { - ensureConnected(); RequestTimeoutTask reqTimeoutTask = null; @@ -874,61 +883,85 @@ public class JdbcThinConnection implements Connection { if (ownThread != null) { throw new SQLException("Concurrent access to JDBC connection is not allowed" + " [ownThread=" + ownThread.getName() - + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE); + + ", curThread=" + Thread.currentThread().getName(), CONNECTION_FAILURE); } ownThread = Thread.currentThread(); } try { - JdbcThinTcpIo cliIo = null; try { - cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo; + int retryAttemptsLeft = 1; - if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) { - reqTimeoutTask = new RequestTimeoutTask( - req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(), - cliIo, - stmt.requestTimeout()); + Exception lastE = null; - qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0, - REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); - } + while (retryAttemptsLeft > 0) { + JdbcThinTcpIo cliIo = null; - JdbcQueryExecuteRequest qryReq = null; + ensureConnected(); - if (req instanceof JdbcQueryExecuteRequest) - qryReq = (JdbcQueryExecuteRequest)req; + try { + cliIo = (stickyIo == null || !stickyIo.connected()) ? cliIo(calculateNodeIds(req)) : stickyIo; - JdbcResponse res = cliIo.sendRequest(req, stmt); + if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT) { + reqTimeoutTask = new RequestTimeoutTask( + req instanceof JdbcBulkLoadBatchRequest ? stmt.currentRequestId() : req.requestId(), + cliIo, + stmt.requestTimeout()); - txIo = res.activeTransaction() ? cliIo : null; + qryTimeoutScheduledFut = maintenanceExecutor.scheduleAtFixedRate(reqTimeoutTask, 0, + REQUEST_TIMEOUT_PERIOD, TimeUnit.MILLISECONDS); + } - if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null && - stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null && - reqTimeoutTask.expired.get()) { + JdbcQueryExecuteRequest qryReq = null; - throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED, - IgniteQueryErrorCode.QUERY_CANCELED); - } - else if (res.status() != ClientListenerResponse.STATUS_SUCCESS) - throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), - res.status()); + if (req instanceof JdbcQueryExecuteRequest) + qryReq = (JdbcQueryExecuteRequest)req; - updateAffinityCache(qryReq, res); + JdbcResponse res = cliIo.sendRequest(req, stmt); - return new JdbcResultWithIo(res.response(), cliIo); - } - catch (SQLException e) { - throw e; - } - catch (Exception e) { - onDisconnect(cliIo); + txIo = res.activeTransaction() ? cliIo : null; - if (e instanceof SocketTimeoutException) - throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); - else - throw new SQLException("Failed to communicate with Ignite cluster.", - SqlStateCode.CONNECTION_FAILURE, e); + if (res.status() == IgniteQueryErrorCode.QUERY_CANCELED && stmt != null && + stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null && + reqTimeoutTask.expired.get()) { + + throw new SQLTimeoutException(QueryCancelledException.ERR_MSG, SqlStateCode.QUERY_CANCELLED, + IgniteQueryErrorCode.QUERY_CANCELED); + } + else if (res.status() != ClientListenerResponse.STATUS_SUCCESS) + throw new SQLException(res.error(), IgniteQueryErrorCode.codeToSqlState(res.status()), + res.status()); + + updateAffinityCache(qryReq, res); + + return new JdbcResultWithIo(res.response(), cliIo); + } + catch (SQLException e) { + if (LOG.isLoggable(Level.FINE)) + LOG.log(Level.FINE, "Exception during sending an sql request.", e); + + throw e; + } + catch (Exception e) { + if (LOG.isLoggable(Level.FINE)) + LOG.log(Level.FINE, "Exception during sending an sql request.", e); + + onDisconnect(cliIo); + + if (e instanceof SocketTimeoutException) + throw new SQLException("Connection timed out.", CONNECTION_FAILURE, e); + else { + if (lastE == null) { + retryAttemptsLeft = calculateRetryAttemptsCount(stickyIo, req); + lastE = e; + } + else + retryAttemptsLeft--; + } + } + } + + throw new SQLException("Failed to communicate with Ignite cluster.", CONNECTION_FAILURE, lastE); } finally { if (stmt != null && stmt.requestTimeout() != NO_TIMEOUT && reqTimeoutTask != null) @@ -1063,7 +1096,7 @@ public class JdbcThinConnection implements Connection { } catch (IgniteCheckedException e) { throw new SQLException("Failed to calculate derived partitions for query.", - SqlStateCode.INTERNAL_ERROR); + INTERNAL_ERROR); } } @@ -1079,7 +1112,7 @@ public class JdbcThinConnection implements Connection { */ void sendQueryCancelRequest(JdbcQueryCancelRequest req, JdbcThinTcpIo cliIo) throws SQLException { if (connCnt.get() == 0) - throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE); + throw new SQLException("Failed to communicate with Ignite cluster.", CONNECTION_FAILURE); assert cliIo != null; @@ -1087,7 +1120,7 @@ public class JdbcThinConnection implements Connection { cliIo.sendCancelRequest(req); } catch (Exception e) { - throw new SQLException("Failed to communicate with Ignite cluster.", SqlStateCode.CONNECTION_FAILURE, e); + throw new SQLException("Failed to communicate with Ignite cluster.", CONNECTION_FAILURE, e); } } @@ -1107,7 +1140,7 @@ public class JdbcThinConnection implements Connection { if (ownThread != null) { throw new SQLException("Concurrent access to JDBC connection is not allowed" + " [ownThread=" + ownThread.getName() - + ", curThread=" + Thread.currentThread().getName(), SqlStateCode.CONNECTION_FAILURE); + + ", curThread=" + Thread.currentThread().getName(), CONNECTION_FAILURE); } ownThread = Thread.currentThread(); @@ -1123,10 +1156,10 @@ public class JdbcThinConnection implements Connection { onDisconnect(stickyIO); if (e instanceof SocketTimeoutException) - throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, e); + throw new SQLException("Connection timed out.", CONNECTION_FAILURE, e); else throw new SQLException("Failed to communicate with Ignite cluster.", - SqlStateCode.CONNECTION_FAILURE, e); + CONNECTION_FAILURE, e); } finally { synchronized (mux) { @@ -1303,7 +1336,7 @@ public class JdbcThinConnection implements Connection { order++; } catch (InterruptedException e) { - throw new SQLException("Streaming operation was interrupted", SqlStateCode.INTERNAL_ERROR, e); + throw new SQLException("Streaming operation was interrupted", INTERNAL_ERROR, e); } } @@ -1324,9 +1357,9 @@ public class JdbcThinConnection implements Connection { onDisconnect(streamingStickyIo); if (err0 instanceof SocketTimeoutException) - throw new SQLException("Connection timed out.", SqlStateCode.CONNECTION_FAILURE, err0); + throw new SQLException("Connection timed out.", CONNECTION_FAILURE, err0); throw new SQLException("Failed to communicate with Ignite cluster on JDBC streaming.", - SqlStateCode.CONNECTION_FAILURE, err0); + CONNECTION_FAILURE, err0); } } } @@ -1571,11 +1604,11 @@ public class JdbcThinConnection implements Connection { throw (SQLException)ex; else if (ex instanceof IOException) throw new SQLException("Failed to connect to Ignite cluster [url=" + connProps.getUrl() + ']', - SqlStateCode.CLIENT_CONNECTION_FAILED, ex); + CLIENT_CONNECTION_FAILED, ex); } SQLException e = new SQLException("Failed to connect to server [url=" + connProps.getUrl() + ']', - SqlStateCode.CLIENT_CONNECTION_FAILED); + CLIENT_CONNECTION_FAILED); for (Exception ex : exceptions) e.addSuppressed(ex); @@ -1614,7 +1647,7 @@ public class JdbcThinConnection implements Connection { throw new SQLException("Failed to connect to Ignite node [url=" + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + "Node doesn't support affinity awareness mode.", - SqlStateCode.INTERNAL_ERROR); + INTERNAL_ERROR); } if (prevIgniteEndpointVer != null && !prevIgniteEndpointVer.equals(cliIo.igniteVersion())) { @@ -1624,7 +1657,7 @@ public class JdbcThinConnection implements Connection { throw new SQLException("Failed to connect to Ignite node [url=" + connProps.getUrl() + "]. address = [" + addr + ':' + port + "]." + "Different versions of nodes are not supported in affinity awareness mode.", - SqlStateCode.INTERNAL_ERROR); + INTERNAL_ERROR); } cliIo.timeout(netTimeout); @@ -1700,6 +1733,47 @@ public class JdbcThinConnection implements Connection { } /** + * Calculates query retries count for given {@param req}. + * + * @param stickyIo sticky connection, if any. + * @param req Jdbc request. + * + * @return retries count. + */ + private int calculateRetryAttemptsCount(JdbcThinTcpIo stickyIo, JdbcRequest req) { + if (!affinityAwareness) + return NO_RETRIES; + + if (stickyIo != null) + return NO_RETRIES; + + if (req.type() == JdbcRequest.META_TABLES || + req.type() == JdbcRequest.META_COLUMNS || + req.type() == JdbcRequest.META_INDEXES || + req.type() == JdbcRequest.META_PARAMS || + req.type() == JdbcRequest.META_PRIMARY_KEYS || + req.type() == JdbcRequest.META_SCHEMAS || + req.type() == JdbcRequest.CACHE_PARTITIONS) + return DFLT_RETRIES_CNT; + + if (req.type() == JdbcRequest.QRY_EXEC) { + JdbcQueryExecuteRequest qryExecReq = (JdbcQueryExecuteRequest)req; + + String trimmedQry = qryExecReq.sqlQuery().trim(); + + // Last symbol is ignored. + for (int i = 0; i < trimmedQry.length() - 1; i++) { + if (trimmedQry.charAt(i) == ';') + return NO_RETRIES; + } + + return trimmedQry.toUpperCase().startsWith("SELECT") ? DFLT_RETRIES_CNT : NO_RETRIES; + } + + return NO_RETRIES; + } + + /** * Request Timeout Task */ private class RequestTimeoutTask implements Runnable { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java index 93e8f18..d3024a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequest.java @@ -35,7 +35,7 @@ import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionCont */ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBinarylizable { /** Execute sql query request. */ - static final byte QRY_EXEC = 2; + public static final byte QRY_EXEC = 2; /** Fetch query results request. */ static final byte QRY_FETCH = 3; @@ -44,28 +44,28 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin static final byte QRY_CLOSE = 4; /** Get query columns metadata request. */ - static final byte QRY_META = 5; + public static final byte QRY_META = 5; /** Batch queries. */ public static final byte BATCH_EXEC = 6; /** Get tables metadata request. */ - static final byte META_TABLES = 7; + public static final byte META_TABLES = 7; /** Get columns metadata request. */ - static final byte META_COLUMNS = 8; + public static final byte META_COLUMNS = 8; /** Get indexes metadata request. */ - static final byte META_INDEXES = 9; + public static final byte META_INDEXES = 9; /** Get SQL query parameters metadata request. */ - static final byte META_PARAMS = 10; + public static final byte META_PARAMS = 10; /** Get primary keys metadata request. */ - static final byte META_PRIMARY_KEYS = 11; + public static final byte META_PRIMARY_KEYS = 11; /** Get schemas metadata request. */ - static final byte META_SCHEMAS = 12; + public static final byte META_SCHEMAS = 12; /** Send a batch of a data from client to server. */ static final byte BULK_LOAD_BATCH = 13; @@ -77,7 +77,7 @@ public class JdbcRequest extends ClientListenerRequestNoId implements JdbcRawBin static final byte QRY_CANCEL = 15; /** Get cache partitions distributions. */ - static final byte CACHE_PARTITIONS = 16; + public static final byte CACHE_PARTITIONS = 16; /** Request Id generator. */ private static final AtomicLong REQ_ID_GENERATOR = new AtomicLong();