flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/2] flink git commit: [FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat
Date Wed, 30 Aug 2017 14:53:07 GMT
[FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat

Allow Integer.MIN_VALUE to be accepted as a parameter for setFetchSize for MySQL Driver.

The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE
serves as a signal to the driver to stream result sets row-by-row. After this, any result
sets created with the statement will be retrieved row-by-row.

This closes #4617.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e753db84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e753db84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e753db84

Branch: refs/heads/master
Commit: e753db8411debfc573ffc330355a0f24c0afbfb5
Parents: 1b7f8bd
Author: Nycholas de Oliveira e Oliveira <nycholas@gmail.com>
Authored: Tue Aug 29 14:21:03 2017 -0300
Committer: zentol <chesnay@apache.org>
Committed: Wed Aug 30 16:51:32 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/io/jdbc/JDBCInputFormat.java   |  5 +++--
 .../flink/api/java/io/jdbc/JDBCInputFormatTest.java      | 11 +++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b7ac744..7d08814 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -144,7 +144,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 				dbConn = DriverManager.getConnection(dbURL, username, password);
 			}
 			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
-			if (fetchSize > 0) {
+			if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) {
 				statement.setFetchSize(fetchSize);
 			}
 		} catch (SQLException se) {
@@ -390,7 +390,8 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 		}
 
 		public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
-			Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to
be positive.", fetchSize);
+			Preconditions.checkArgument(fetchSize == Integer.MIN_VALUE || fetchSize > 0,
+				"Illegal value %s for fetchSize, has to be positive or Integer.MIN_VALUE.", fetchSize);
 			format.fetchSize = fetchSize;
 			return this;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e753db84/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index f7a86e5..10e8c66 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -114,6 +114,17 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 	}
 
 	@Test
+	public void testValidFetchSizeIntegerMin() {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(Integer.MIN_VALUE)
+			.finish();
+	}
+
+	@Test
 	public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException
{
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 			.setDrivername(DRIVER_CLASS)


Mime
View raw message