flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [5/6] flink git commit: [FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.
Date Fri, 02 Jun 2017 13:14:23 GMT
[FLINK-6781] Make statement fetch size configurable in JDBCInputFormat.

This closes #4036.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5605107b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5605107b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5605107b

Branch: refs/heads/master
Commit: 5605107bd59f10fc9ca7d9ac53fa8d60ecdcfdbc
Parents: c041dd8
Author: Maximilian Bode <maximilian.bode@tngtech.com>
Authored: Wed May 31 18:46:55 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Fri Jun 2 15:13:55 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 17 ++++++++
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 43 ++++++++++++++++++++
 2 files changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 835fb23..b7ac744 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
@@ -30,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +115,7 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 	private transient Connection dbConn;
 	private transient PreparedStatement statement;
 	private transient ResultSet resultSet;
+	private int fetchSize;
 
 	private boolean hasNext;
 	private Object[][] parameterValues;
@@ -141,6 +144,9 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 				dbConn = DriverManager.getConnection(dbURL, username, password);
 			}
 			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
+			if (fetchSize > 0) {
+				statement.setFetchSize(fetchSize);
+			}
 		} catch (SQLException se) {
 			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
 		} catch (ClassNotFoundException cnfe) {
@@ -312,6 +318,11 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 		return new DefaultInputSplitAssigner(inputSplits);
 	}
 
+	@VisibleForTesting
+	PreparedStatement getStatement() {
+		return statement;
+	}
+
 	/**
 	 * A builder used to set parameters to the output format's configuration in a fluent way.
 	 * @return builder
@@ -378,6 +389,12 @@ public class JDBCInputFormat extends RichInputFormat<Row, InputSplit>
implements
 			return this;
 		}
 
+		public JDBCInputFormatBuilder setFetchSize(int fetchSize) {
+			Preconditions.checkArgument(fetchSize > 0, "Illegal value %s for fetchSize, has to
be positive.", fetchSize);
+			format.fetchSize = fetchSize;
+			return this;
+		}
+
 		public JDBCInputFormat finish() {
 			if (format.username == null) {
 				LOG.info("Username was not supplied separately.");

http://git-wip-us.apache.org/repos/asf/flink/blob/5605107b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index b1416ea..f7a86e5 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -30,7 +30,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 
 /**
  * Tests for the {@link JDBCInputFormat}.
@@ -100,6 +102,47 @@ public class JDBCInputFormatTest extends JDBCTestBase {
 				.finish();
 	}
 
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidFetchSize() {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(-7)
+			.finish();
+	}
+
+	@Test
+	public void testDefaultFetchSizeIsUsedIfNotConfiguredOtherwise() throws SQLException, ClassNotFoundException
{
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.finish();
+		jdbcInputFormat.openInputFormat();
+
+		Class.forName(DRIVER_CLASS);
+		final int defaultFetchSize = DriverManager.getConnection(DB_URL).createStatement().getFetchSize();
+
+		Assert.assertEquals(defaultFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+	}
+
+	@Test
+	public void testFetchSizeCanBeConfigured() throws SQLException {
+		final int desiredFetchSize = 10_000;
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(SELECT_ALL_BOOKS)
+			.setRowTypeInfo(ROW_TYPE_INFO)
+			.setFetchSize(desiredFetchSize)
+			.finish();
+		jdbcInputFormat.openInputFormat();
+		Assert.assertEquals(desiredFetchSize, jdbcInputFormat.getStatement().getFetchSize());
+	}
+
 	@Test
 	public void testJDBCInputFormatWithoutParallelism() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()


Mime
View raw message