Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2EB1200ACA for ; Thu, 19 May 2016 00:15:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F17DC160A00; Wed, 18 May 2016 22:15:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 799EF160A1B for ; Thu, 19 May 2016 00:15:51 +0200 (CEST) Received: (qmail 39624 invoked by uid 500); 18 May 2016 22:15:50 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 39555 invoked by uid 99); 18 May 2016 22:15:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 May 2016 22:15:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 567D9E00C5; Wed, 18 May 2016 22:15:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 18 May 2016 22:15:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [FLINK-3750] [jdbcConnector] Refactor JdbcInputFormat and JdbcOutputFormat. archived-at: Wed, 18 May 2016 22:15:54 -0000 [FLINK-3750] [jdbcConnector] Refactor JdbcInputFormat and JdbcOutputFormat. - New Input- and OutputFormat use Row instead of Tuple types to support null values. - JdbcInputFormat supports parallel input due to PreparedStatement and binding values for parameters. This closes #1941 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09b428bd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09b428bd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09b428bd Branch: refs/heads/master Commit: 09b428bd65819b946cf82ab1fdee305eb5a941f5 Parents: a24df98 Author: Flavio Pompermaier Authored: Tue Apr 26 19:10:53 2016 +0200 Committer: Fabian Hueske Committed: Wed May 18 22:03:58 2016 +0200 ---------------------------------------------------------------------- flink-batch-connectors/flink-jdbc/pom.xml | 6 + .../flink/api/java/io/jdbc/JDBCInputFormat.java | 392 ++++++++++--------- .../api/java/io/jdbc/JDBCOutputFormat.java | 255 +++++++----- .../split/GenericParameterValuesProvider.java | 44 +++ .../split/NumericBetweenParametersProvider.java | 66 ++++ .../io/jdbc/split/ParameterValuesProvider.java | 35 ++ .../flink/api/java/io/jdbc/DerbyUtil.java | 31 -- .../flink/api/java/io/jdbc/JDBCFullTest.java | 101 +++++ .../api/java/io/jdbc/JDBCInputFormatTest.java | 297 ++++++++------ .../api/java/io/jdbc/JDBCOutputFormatTest.java | 222 ++++------- .../flink/api/java/io/jdbc/JDBCTestBase.java | 183 +++++++++ .../api/java/io/jdbc/example/JDBCExample.java | 102 ----- 12 files changed, 1042 insertions(+), 692 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/pom.xml b/flink-batch-connectors/flink-jdbc/pom.xml index efa3d2f..cb169ea 100644 --- a/flink-batch-connectors/flink-jdbc/pom.xml +++ b/flink-batch-connectors/flink-jdbc/pom.xml @@ -38,6 +38,12 @@ under the License. org.apache.flink + flink-table_2.10 + ${project.version} + provided + + + org.apache.flink flink-java ${project.version} provided http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java index b764350..b4246f5 100644 --- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java @@ -19,114 +19,236 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Array; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.flink.api.common.io.NonParallelInput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Arrays; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.types.NullValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * InputFormat to read data from a database and generate tuples. + * InputFormat to read data from a database and generate Rows. * The InputFormat has to be configured using the supplied InputFormatBuilder. - * - * @param - * @see Tuple + * A valid RowTypeInfo must be properly configured in the builder, e.g.:
+ * + *

+ * TypeInformation[] fieldTypes = new TypeInformation[] {
+ *		BasicTypeInfo.INT_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.DOUBLE_TYPE_INFO,
+ *		BasicTypeInfo.INT_TYPE_INFO
+ *	};
+ *
+ * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.finish();
+ * 
+ * + * In order to query the JDBC source in parallel, you need to provide a + * parameterized query template (i.e. a valid {@link PreparedStatement}) and + * a {@link ParameterValuesProvider} which provides binding values for the + * query parameters. E.g.:
+ * + *

+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books WHERE author = ?")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.setParametersProvider(new GenericParameterValuesProvider(queryParameters))
+ *				.finish();
+ * 
+ * + * @see Row + * @see ParameterValuesProvider + * @see PreparedStatement * @see DriverManager */ -public class JDBCInputFormat extends RichInputFormat implements NonParallelInput { - private static final long serialVersionUID = 1L; +public class JDBCInputFormat extends RichInputFormat implements ResultTypeQueryable { + private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); private String username; private String password; private String drivername; private String dbURL; - private String query; + private String queryTemplate; private int resultSetType; private int resultSetConcurrency; + private RowTypeInfo rowTypeInfo; private transient Connection dbConn; - private transient Statement statement; + private transient PreparedStatement statement; private transient ResultSet resultSet; - private int[] columnTypes = null; + private boolean hasNext; + private Object[][] parameterValues; public JDBCInputFormat() { } @Override + public RowTypeInfo getProducedType() { + return rowTypeInfo; + } + + @Override public void configure(Configuration parameters) { + //do nothing here } - /** - * Connects to the source database and executes the query. - * - * @param ignored - * @throws IOException - */ @Override - public void open(InputSplit ignored) throws IOException { + public void openInputFormat() { + //called once per inputFormat (on open) try { - establishConnection(); - statement = dbConn.createStatement(resultSetType, resultSetConcurrency); - resultSet = statement.executeQuery(query); + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); } catch (SQLException se) { - close(); throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); } } - private void establishConnection() throws SQLException, ClassNotFoundException { - Class.forName(drivername); - if (username == null) { - dbConn = DriverManager.getConnection(dbURL); - } else { - dbConn = DriverManager.getConnection(dbURL, username, password); + @Override + public void closeInputFormat() { + //called once per inputFormat (on close) + try { + if(statement != null) { + statement.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); + } finally { + statement = null; + } + + try { + if(dbConn != null) { + dbConn.close(); + } + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } finally { + dbConn = null; } + + parameterValues = null; } /** - * Closes all resources used. + * Connects to the source database and executes the query in a parallel + * fashion if + * this {@link InputFormat} is built using a parameterized query (i.e. using + * a {@link PreparedStatement}) + * and a proper {@link ParameterValuesProvider}, in a non-parallel + * fashion otherwise. * - * @throws IOException Indicates that a resource could not be closed. + * @param inputSplit which is ignored if this InputFormat is executed as a + * non-parallel source, + * a "hook" to the query parameters otherwise (using its + * splitNumber) + * @throws IOException if there's an error during the execution of the query */ @Override - public void close() throws IOException { + public void open(InputSplit inputSplit) throws IOException { try { - resultSet.close(); + if (inputSplit != null && parameterValues != null) { + for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { + Object param = parameterValues[inputSplit.getSplitNumber()][i]; + if (param instanceof String) { + statement.setString(i + 1, (String) param); + } else if (param instanceof Long) { + statement.setLong(i + 1, (Long) param); + } else if (param instanceof Integer) { + statement.setInt(i + 1, (Integer) param); + } else if (param instanceof Double) { + statement.setDouble(i + 1, (Double) param); + } else if (param instanceof Boolean) { + statement.setBoolean(i + 1, (Boolean) param); + } else if (param instanceof Float) { + statement.setFloat(i + 1, (Float) param); + } else if (param instanceof BigDecimal) { + statement.setBigDecimal(i + 1, (BigDecimal) param); + } else if (param instanceof Byte) { + statement.setByte(i + 1, (Byte) param); + } else if (param instanceof Short) { + statement.setShort(i + 1, (Short) param); + } else if (param instanceof Date) { + statement.setDate(i + 1, (Date) param); + } else if (param instanceof Time) { + statement.setTime(i + 1, (Time) param); + } else if (param instanceof Timestamp) { + statement.setTimestamp(i + 1, (Timestamp) param); + } else if (param instanceof Array) { + statement.setArray(i + 1, (Array) param); + } else { + //extends with other types if needed + throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." ); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); + } + } + resultSet = statement.executeQuery(); + hasNext = resultSet.next(); } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { + throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } - try { - statement.close(); - } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { + } + + /** + * Closes all resources used. + * + * @throws IOException Indicates that a resource could not be closed. + */ + @Override + public void close() throws IOException { + if(resultSet == null) { + return; } try { - dbConn.close(); + resultSet.close(); } catch (SQLException se) { - LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { + LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage()); } } @@ -138,150 +260,35 @@ public class JDBCInputFormat extends RichInputFormat extends RichInputFormat extends RichInputFormat extends RichInputFormat extends RichInputFormat extends RichInputFormat * @see Tuple * @see DriverManager */ -public class JDBCOutputFormat extends RichOutputFormat { +public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; - - @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); - + private String username; private String password; private String drivername; private String dbURL; private String query; private int batchInterval = 5000; - + private Connection dbConn; private PreparedStatement upload; - - private SupportedTypes[] types = null; - + private int batchCount = 0; - + + public int[] typesArray; + public JDBCOutputFormat() { } - + @Override public void configure(Configuration parameters) { } - + /** * Connects to the target database and initializes the prepared statement. * @@ -78,14 +77,12 @@ public class JDBCOutputFormat extends RichOutputFormat { establishConnection(); upload = dbConn.prepareStatement(query); } catch (SQLException sqe) { - close(); - throw new IllegalArgumentException("open() failed:\t!", sqe); + throw new IllegalArgumentException("open() failed.", sqe); } catch (ClassNotFoundException cnfe) { - close(); - throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe); + throw new IllegalArgumentException("JDBC driver class not found.", cnfe); } } - + private void establishConnection() throws SQLException, ClassNotFoundException { Class.forName(drivername); if (username == null) { @@ -94,86 +91,125 @@ public class JDBCOutputFormat extends RichOutputFormat { dbConn = DriverManager.getConnection(dbURL, username, password); } } - - private enum SupportedTypes { - BOOLEAN, - BYTE, - SHORT, - INTEGER, - LONG, - STRING, - FLOAT, - DOUBLE - } - + /** * Adds a record to the prepared statement. *

* When this method is called, the output format is guaranteed to be opened. + *

+ * + * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to + * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null)) * - * @param tuple The records to add to the output. + * @param row The records to add to the output. + * @see PreparedStatement * @throws IOException Thrown, if the records could not be added due to an I/O problem. */ @Override - public void writeRecord(OUT tuple) throws IOException { + public void writeRecord(Row row) throws IOException { + + if (typesArray != null && typesArray.length > 0 && typesArray.length == row.productArity()) { + LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); + } try { - if (types == null) { - extractTypes(tuple); + + if (typesArray == null ) { + // no types provided + for (int index = 0; index < row.productArity(); index++) { + LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index)); + upload.setObject(index + 1, row.productElement(index)); + } + } else { + // types provided + for (int index = 0; index < row.productArity(); index++) { + + if (row.productElement(index) == null) { + upload.setNull(index + 1, typesArray[index]); + } else { + // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html + switch (typesArray[index]) { + case java.sql.Types.NULL: + upload.setNull(index + 1, typesArray[index]); + break; + case java.sql.Types.BOOLEAN: + case java.sql.Types.BIT: + upload.setBoolean(index + 1, (boolean) row.productElement(index)); + break; + case java.sql.Types.CHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.LONGNVARCHAR: + upload.setString(index + 1, (String) row.productElement(index)); + break; + case java.sql.Types.TINYINT: + upload.setByte(index + 1, (byte) row.productElement(index)); + break; + case java.sql.Types.SMALLINT: + upload.setShort(index + 1, (short) row.productElement(index)); + break; + case java.sql.Types.INTEGER: + upload.setInt(index + 1, (int) row.productElement(index)); + break; + case java.sql.Types.BIGINT: + upload.setLong(index + 1, (long) row.productElement(index)); + break; + case java.sql.Types.REAL: + upload.setFloat(index + 1, (float) row.productElement(index)); + break; + case java.sql.Types.FLOAT: + case java.sql.Types.DOUBLE: + upload.setDouble(index + 1, (double) row.productElement(index)); + break; + case java.sql.Types.DECIMAL: + case java.sql.Types.NUMERIC: + upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index)); + break; + case java.sql.Types.DATE: + upload.setDate(index + 1, (java.sql.Date) row.productElement(index)); + break; + case java.sql.Types.TIME: + upload.setTime(index + 1, (java.sql.Time) row.productElement(index)); + break; + case java.sql.Types.TIMESTAMP: + upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index)); + break; + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + upload.setBytes(index + 1, (byte[]) row.productElement(index)); + break; + default: + upload.setObject(index + 1, row.productElement(index)); + LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.", + typesArray[index], index + 1, row.productElement(index)); + // case java.sql.Types.SQLXML + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUC + } + } + } } - addValues(tuple); upload.addBatch(); batchCount++; if (batchCount >= batchInterval) { upload.executeBatch(); batchCount = 0; } - } catch (SQLException sqe) { - close(); - throw new IllegalArgumentException("writeRecord() failed", sqe); - } catch (IllegalArgumentException iae) { - close(); - throw new IllegalArgumentException("writeRecord() failed", iae); - } - } - - private void extractTypes(OUT tuple) { - types = new SupportedTypes[tuple.getArity()]; - for (int x = 0; x < tuple.getArity(); x++) { - types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase()); + } catch (SQLException | IllegalArgumentException e) { + throw new IllegalArgumentException("writeRecord() failed", e); } } - - private void addValues(OUT tuple) throws SQLException { - for (int index = 0; index < tuple.getArity(); index++) { - switch (types[index]) { - case BOOLEAN: - upload.setBoolean(index + 1, (Boolean) tuple.getField(index)); - break; - case BYTE: - upload.setByte(index + 1, (Byte) tuple.getField(index)); - break; - case SHORT: - upload.setShort(index + 1, (Short) tuple.getField(index)); - break; - case INTEGER: - upload.setInt(index + 1, (Integer) tuple.getField(index)); - break; - case LONG: - upload.setLong(index + 1, (Long) tuple.getField(index)); - break; - case STRING: - upload.setString(index + 1, (String) tuple.getField(index)); - break; - case FLOAT: - upload.setFloat(index + 1, (Float) tuple.getField(index)); - break; - case DOUBLE: - upload.setDouble(index + 1, (Double) tuple.getField(index)); - break; - } - } - } - + /** * Executes prepared statement and closes all resources of this instance. * @@ -182,70 +218,78 @@ public class JDBCOutputFormat extends RichOutputFormat { @Override public void close() throws IOException { try { - upload.executeBatch(); - batchCount = 0; - } catch (SQLException se) { - throw new IllegalArgumentException("close() failed", se); - } catch (NullPointerException se) { - } - try { - upload.close(); + if (upload != null) { + upload.executeBatch(); + upload.close(); + } } catch (SQLException se) { LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { + } finally { + upload = null; + batchCount = 0; } + try { - dbConn.close(); + if (dbConn != null) { + dbConn.close(); + } } catch (SQLException se) { LOG.info("Inputformat couldn't be closed - " + se.getMessage()); - } catch (NullPointerException npe) { + } finally { + dbConn = null; } } - + public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { return new JDBCOutputFormatBuilder(); } - + public static class JDBCOutputFormatBuilder { private final JDBCOutputFormat format; - + protected JDBCOutputFormatBuilder() { this.format = new JDBCOutputFormat(); } - + public JDBCOutputFormatBuilder setUsername(String username) { format.username = username; return this; } - + public JDBCOutputFormatBuilder setPassword(String password) { format.password = password; return this; } - + public JDBCOutputFormatBuilder setDrivername(String drivername) { format.drivername = drivername; return this; } - + public JDBCOutputFormatBuilder setDBUrl(String dbURL) { format.dbURL = dbURL; return this; } - + public JDBCOutputFormatBuilder setQuery(String query) { format.query = query; return this; } - + public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { format.batchInterval = batchInterval; return this; } - + + public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { + format.typesArray = typesArray; + return this; + } + /** - Finalizes the configuration and checks validity. - @return Configured JDBCOutputFormat + * Finalizes the configuration and checks validity. + * + * @return Configured JDBCOutputFormat */ public JDBCOutputFormat finish() { if (format.username == null) { @@ -263,8 +307,9 @@ public class JDBCOutputFormat extends RichOutputFormat { if (format.drivername == null) { throw new IllegalArgumentException("No driver supplied"); } + return format; } } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java new file mode 100644 index 0000000..6c70a8c --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; + +/** + * + * This splits generator actually does nothing but wrapping the query parameters + * computed by the user before creating the {@link JDBCInputFormat} instance. + * + * */ +public class GenericParameterValuesProvider implements ParameterValuesProvider { + + private final Serializable[][] parameters; + + public GenericParameterValuesProvider(Serializable[][] parameters) { + this.parameters = parameters; + } + + @Override + public Serializable[][] getParameterValues(){ + //do nothing...precomputed externally + return parameters; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java new file mode 100644 index 0000000..306663e --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +/** + * + * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column. + * The generated query set will be of size equal to the configured fetchSize (apart the last one range), + * ranging from the min value up to the max. + * + * For example, if there's a table BOOKS with a numeric PK id, using a query like: + *

+ *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * 
+ * + * you can use this class to automatically generate the parameters of the BETWEEN clause, + * based on the passed constructor parameters. + * + * */ +public class NumericBetweenParametersProvider implements ParameterValuesProvider { + + private long fetchSize; + private final long min; + private final long max; + + public NumericBetweenParametersProvider(long fetchSize, long min, long max) { + this.fetchSize = fetchSize; + this.min = min; + this.max = max; + } + + @Override + public Serializable[][] getParameterValues(){ + double maxElemCount = (max - min) + 1; + int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue(); + Serializable[][] parameters = new Serializable[size][2]; + int count = 0; + for (long i = min; i < max; i += fetchSize, count++) { + long currentLimit = i + fetchSize - 1; + parameters[count] = new Long[]{i,currentLimit}; + if (currentLimit + 1 + fetchSize > max) { + parameters[count + 1] = new Long[]{currentLimit + 1, max}; + break; + } + } + return parameters; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java new file mode 100644 index 0000000..6c632f8 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc.split; + +import java.io.Serializable; + +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; + +/** + * + * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). + * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation + * + * */ +public interface ParameterValuesProvider { + + /** Returns the necessary parameters array to use for query in parallel a table */ + public Serializable[][] getParameterValues(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java deleted file mode 100644 index 523b8b5..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.io.jdbc; - -import java.io.OutputStream; - -@SuppressWarnings("unused") -/** - * Utility class to disable derby logging - */ -public class DerbyUtil { - public static final OutputStream DEV_NULL = new OutputStream() { - public void write(int b) { - } - }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java new file mode 100644 index 0000000..da9469b --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.table.Row; +import org.junit.Assert; +import org.junit.Test; + +public class JDBCFullTest extends JDBCTestBase { + + @Test + public void testJdbcInOut() throws Exception { + //run without parallelism + runTest(false); + + //cleanup + JDBCTestBase.tearDownClass(); + JDBCTestBase.prepareTestDb(); + + //run expliting parallelism + runTest(true); + + } + + private void runTest(boolean exploitParallelism) { + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(JDBCTestBase.DRIVER_CLASS) + .setDBUrl(JDBCTestBase.DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo); + + if(exploitParallelism) { + final int fetchSize = 1; + final Long min = new Long(JDBCTestBase.testData[0][0].toString()); + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString()); + //use a "splittable" query to exploit parallelism + inputBuilder = inputBuilder + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max)); + } + DataSet source = environment.createInput(inputBuilder.finish()); + + //NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but + //some database, doens't handle correctly null values when no column type specified + //in PreparedStatement.setObject (see its javadoc for more details) + source.output(JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(JDBCTestBase.DRIVER_CLASS) + .setDBUrl(JDBCTestBase.DB_URL) + .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") + .setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER}) + .finish()); + try { + environment.execute(); + } catch (Exception e) { + Assert.fail("JDBC full test failed. " + e.getMessage()); + } + + try ( + Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); + PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); + ResultSet resultSet = statement.executeQuery() + ) { + int count = 0; + while (resultSet.next()) { + count++; + } + Assert.assertEquals(JDBCTestBase.testData.length, count); + } catch (SQLException e) { + Assert.fail("JDBC full test failed. " + e.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java index 3fb0278..efae076 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -19,180 +19,229 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; +import java.io.Serializable; import java.sql.ResultSet; - -import org.junit.Assert; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider; +import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider; +import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider; +import org.apache.flink.api.table.Row; +import org.apache.flink.core.io.InputSplit; import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Assert; import org.junit.Test; -public class JDBCInputFormatTest { - JDBCInputFormat jdbcInputFormat; +public class JDBCInputFormatTest extends JDBCTestBase { - static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() { - try { - prepareDerbyDatabase(); - } catch (Exception e) { - Assert.fail(); - } - } + private JDBCInputFormat jdbcInputFormat; - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable(); - insertDataToSQLTable(); - conn.close(); - } - - private static void createTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTable() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } - - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); + @After + public void tearDown() throws IOException { + if (jdbcInputFormat != null) { + jdbcInputFormat.close(); } + jdbcInputFormat = null; } - @After - public void tearDown() { - jdbcInputFormat = null; + @Test(expected = IllegalArgumentException.class) + public void testUntypedRowInfo() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .finish(); + jdbcInputFormat.openInputFormat(); } @Test(expected = IllegalArgumentException.class) public void testInvalidDriver() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) .finish(); - jdbcInputFormat.open(null); + jdbcInputFormat.openInputFormat(); } @Test(expected = IllegalArgumentException.class) public void testInvalidURL() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDrivername(DRIVER_CLASS) .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") - .setQuery("select * from books") + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) .finish(); - jdbcInputFormat.open(null); + jdbcInputFormat.openInputFormat(); } @Test(expected = IllegalArgumentException.class) public void testInvalidQuery() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) .setQuery("iamnotsql") + .setRowTypeInfo(rowTypeInfo) .finish(); - jdbcInputFormat.open(null); + jdbcInputFormat.openInputFormat(); } @Test(expected = IllegalArgumentException.class) public void testIncompleteConfiguration() throws IOException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setQuery("select * from books") - .finish(); - } - - @Test(expected = IOException.class) - public void testIncompatibleTuple() throws IOException { - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDrivername(DRIVER_CLASS) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) .finish(); - jdbcInputFormat.open(null); - jdbcInputFormat.nextRecord(new Tuple2()); } @Test - public void testJDBCInputFormat() throws IOException { + public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException { jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_ALL_BOOKS) + .setRowTypeInfo(rowTypeInfo) .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) .finish(); + //this query does not exploit parallelism + Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length); + jdbcInputFormat.openInputFormat(); jdbcInputFormat.open(null); - Tuple5 tuple = new Tuple5(); + Row row = new Row(5); int recordCount = 0; while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(tuple); - Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); - Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); - Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); - Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); - Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} for (int x = 0; x < 5; x++) { - Assert.assertEquals(dbData[recordCount][x], tuple.getField(x)); + if(testData[recordCount][x]!=null) { + Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + } } recordCount++; } - Assert.assertEquals(5, recordCount); + jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(testData.length, recordCount); + } + + @Test + public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException { + final int fetchSize = 1; + final Long min = new Long(JDBCTestBase.testData[0][0] + ""); + final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + ""); + ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(pramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //this query exploit parallelism (1 split for every id) + Assert.assertEquals(testData.length, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits[i]); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + + for (int x = 0; x < 5; x++) { + if(testData[recordCount][x]!=null) { + Assert.assertEquals(testData[recordCount][x], next.productElement(x)); + } + } + recordCount++; + } + jdbcInputFormat.close(); + } + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(testData.length, recordCount); + } + + @Test + public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException { + Serializable[][] queryParameters = new String[2][1]; + queryParameters[0] = new String[]{"Kumar"}; + queryParameters[1] = new String[]{"Tan Ah Teck"}; + ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters); + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR) + .setRowTypeInfo(rowTypeInfo) + .setParametersProvider(paramProvider) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + jdbcInputFormat.openInputFormat(); + InputSplit[] splits = jdbcInputFormat.createInputSplits(1); + //this query exploit parallelism (1 split for every queryParameters row) + Assert.assertEquals(queryParameters.length, splits.length); + int recordCount = 0; + Row row = new Row(5); + for (int i = 0; i < splits.length; i++) { + jdbcInputFormat.open(splits[i]); + while (!jdbcInputFormat.reachedEnd()) { + Row next = jdbcInputFormat.nextRecord(row); + if (next == null) { + break; + } + if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());} + if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());} + if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());} + if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());} + if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());} + + recordCount++; + } + jdbcInputFormat.close(); + } + Assert.assertEquals(3, recordCount); + jdbcInputFormat.closeInputFormat(); + } + + @Test + public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(SELECT_EMPTY) + .setRowTypeInfo(rowTypeInfo) + .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) + .finish(); + jdbcInputFormat.openInputFormat(); + jdbcInputFormat.open(null); + Row row = new Row(5); + int recordsCnt = 0; + while (!jdbcInputFormat.reachedEnd()) { + Assert.assertNull(jdbcInputFormat.nextRecord(row)); + recordsCnt++; + } + jdbcInputFormat.close(); + jdbcInputFormat.closeInputFormat(); + Assert.assertEquals(0, recordsCnt); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java index 1031b9a..086a84c 100644 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -21,105 +21,26 @@ package org.apache.flink.api.java.io.jdbc; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.PreparedStatement; import java.sql.ResultSet; - -import org.junit.Assert; +import java.sql.SQLException; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.table.Row; import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Assert; import org.junit.Test; -public class JDBCOutputFormatTest { - private JDBCInputFormat jdbcInputFormat; - private JDBCOutputFormat jdbcOutputFormat; - - private static Connection conn; - - static final Object[][] dbData = { - {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, - {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, - {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, - {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, - {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; - - @BeforeClass - public static void setUpClass() throws SQLException { - try { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - prepareDerbyDatabase(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - Assert.fail(); - } - } - - private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - conn = DriverManager.getConnection(dbURL); - createTable("books"); - createTable("newbooks"); - insertDataToSQLTables(); - conn.close(); - } - - private static void createTable(String tableName) throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); - sqlQueryBuilder.append(tableName); - sqlQueryBuilder.append(" ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - } - - private static void insertDataToSQLTables() throws SQLException { - StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - Statement stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - } - - @AfterClass - public static void tearDownClass() { - cleanUpDerbyDatabases(); - } +public class JDBCOutputFormatTest extends JDBCTestBase { - private static void cleanUpDerbyDatabases() { - try { - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - - conn = DriverManager.getConnection(dbURL); - Statement stat = conn.createStatement(); - stat.executeUpdate("DROP TABLE books"); - stat.executeUpdate("DROP TABLE newbooks"); - stat.close(); - conn.close(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(); - } - } + private JDBCOutputFormat jdbcOutputFormat; + private Tuple5 tuple5 = new Tuple5<>(); @After - public void tearDown() { + public void tearDown() throws IOException { + if (jdbcOutputFormat != null) { + jdbcOutputFormat.close(); + } jdbcOutputFormat = null; } @@ -127,8 +48,8 @@ public class JDBCOutputFormatTest { public void testInvalidDriver() throws IOException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.apache.derby.jdbc.idontexist") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) .finish(); jdbcOutputFormat.open(0, 1); } @@ -136,9 +57,9 @@ public class JDBCOutputFormatTest { @Test(expected = IllegalArgumentException.class) public void testInvalidURL() throws IOException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDrivername(DRIVER_CLASS) .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") - .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) .finish(); jdbcOutputFormat.open(0, 1); } @@ -146,8 +67,8 @@ public class JDBCOutputFormatTest { @Test(expected = IllegalArgumentException.class) public void testInvalidQuery() throws IOException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) .setQuery("iamnotsql") .finish(); jdbcOutputFormat.open(0, 1); @@ -156,8 +77,8 @@ public class JDBCOutputFormatTest { @Test(expected = IllegalArgumentException.class) public void testIncompleteConfiguration() throws IOException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .setDrivername(DRIVER_CLASS) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) .finish(); } @@ -165,79 +86,84 @@ public class JDBCOutputFormatTest { @Test(expected = IllegalArgumentException.class) public void testIncompatibleTypes() throws IOException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE)) .finish(); jdbcOutputFormat.open(0, 1); - Tuple5 tuple5 = new Tuple5(); tuple5.setField(4, 0); tuple5.setField("hello", 1); tuple5.setField("world", 2); tuple5.setField(0.99, 3); tuple5.setField("imthewrongtype", 4); - jdbcOutputFormat.writeRecord(tuple5); + Row row = new Row(tuple5.getArity()); + for (int i = 0; i < tuple5.getArity(); i++) { + row.setField(i, tuple5.getField(i)); + } + jdbcOutputFormat.writeRecord(row); jdbcOutputFormat.close(); } @Test - public void testJDBCOutputFormat() throws IOException { - String sourceTable = "books"; - String targetTable = "newbooks"; - String driverPath = "org.apache.derby.jdbc.EmbeddedDriver"; - String dbUrl = "jdbc:derby:memory:ebookshop"; + public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException { jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() - .setDBUrl(dbUrl) - .setDrivername(driverPath) - .setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)") + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE)) .finish(); jdbcOutputFormat.open(0, 1); - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(driverPath) - .setDBUrl(dbUrl) - .setQuery("select * from " + sourceTable) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - jdbcInputFormat.open(null); - - Tuple5 tuple = new Tuple5(); - while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(tuple); - jdbcOutputFormat.writeRecord(tuple); + for (int i = 0; i < testData.length; i++) { + Row row = new Row(testData[i].length); + for (int j = 0; j < testData[i].length; j++) { + row.setField(j, testData[i][j]); + } + jdbcOutputFormat.writeRecord(row); } jdbcOutputFormat.close(); - jdbcInputFormat.close(); - jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() - .setDrivername(driverPath) - .setDBUrl(dbUrl) - .setQuery("select * from " + targetTable) - .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE) - .finish(); - jdbcInputFormat.open(null); - - int recordCount = 0; - while (!jdbcInputFormat.reachedEnd()) { - jdbcInputFormat.nextRecord(tuple); - Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); - Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); - Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); - Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); - Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); - - for (int x = 0; x < 5; x++) { - Assert.assertEquals(dbData[recordCount][x], tuple.getField(x)); + try ( + Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL); + PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS); + ResultSet resultSet = statement.executeQuery() + ) { + int recordCount = 0; + while (resultSet.next()) { + Row row = new Row(tuple5.getArity()); + for (int i = 0; i < tuple5.getArity(); i++) { + row.setField(i, resultSet.getObject(i + 1)); + } + if (row.productElement(0) != null) { + Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass()); + } + if (row.productElement(1) != null) { + Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass()); + } + if (row.productElement(2) != null) { + Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass()); + } + if (row.productElement(3) != null) { + Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass()); + } + if (row.productElement(4) != null) { + Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass()); + } + + for (int x = 0; x < tuple5.getArity(); x++) { + if (JDBCTestBase.testData[recordCount][x] != null) { + Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x)); + } + } + + recordCount++; } - - recordCount++; + Assert.assertEquals(JDBCTestBase.testData.length, recordCount); + } catch (SQLException e) { + Assert.fail("JDBC OutputFormat test failed. " + e.getMessage()); } - Assert.assertEquals(5, recordCount); - - jdbcInputFormat.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java new file mode 100644 index 0000000..1c44afe --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io.jdbc; + +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + +/** + * Base test class for JDBC Input and Output formats + */ +public class JDBCTestBase { + + public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver"; + public static final String DB_URL = "jdbc:derby:memory:ebookshop"; + public static final String INPUT_TABLE = "books"; + public static final String OUTPUT_TABLE = "newbooks"; + public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; + public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; + public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; + public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; + public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?"; + + protected static Connection conn; + + public static final Object[][] testData = { + {1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11}, + {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, + {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, + {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, + {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}, + {1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66}, + {1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77}, + {1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88}, + {1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99}, + {1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}}; + + public static final TypeInformation[] fieldTypes = new TypeInformation[] { + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + }; + + public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes); + + public static String getCreateQuery(String tableName) { + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); + sqlQueryBuilder.append(tableName).append(" ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + return sqlQueryBuilder.toString(); + } + + public static String getInsertQuery() { + StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + for (int i = 0; i < JDBCTestBase.testData.length; i++) { + sqlQueryBuilder.append("(") + .append(JDBCTestBase.testData[i][0]).append(",'") + .append(JDBCTestBase.testData[i][1]).append("','") + .append(JDBCTestBase.testData[i][2]).append("',") + .append(JDBCTestBase.testData[i][3]).append(",") + .append(JDBCTestBase.testData[i][4]).append(")"); + if (i < JDBCTestBase.testData.length - 1) { + sqlQueryBuilder.append(","); + } + } + String insertQuery = sqlQueryBuilder.toString(); + return insertQuery; + } + + public static final OutputStream DEV_NULL = new OutputStream() { + @Override + public void write(int b) { + } + }; + + public static void prepareTestDb() throws Exception { + System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); + Class.forName(DRIVER_CLASS); + Connection conn = DriverManager.getConnection(DB_URL + ";create=true"); + + //create input table + Statement stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(INPUT_TABLE)); + stat.close(); + + //create output table + stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(OUTPUT_TABLE)); + stat.close(); + + //prepare input data + stat = conn.createStatement(); + stat.execute(JDBCTestBase.getInsertQuery()); + stat.close(); + + conn.close(); + } + + @BeforeClass + public static void setUpClass() throws SQLException { + try { + System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL"); + prepareDerbyDatabase(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { + Class.forName(DRIVER_CLASS); + conn = DriverManager.getConnection(DB_URL + ";create=true"); + createTable(INPUT_TABLE); + createTable(OUTPUT_TABLE); + insertDataIntoInputTable(); + conn.close(); + } + + private static void createTable(String tableName) throws SQLException { + Statement stat = conn.createStatement(); + stat.executeUpdate(getCreateQuery(tableName)); + stat.close(); + } + + private static void insertDataIntoInputTable() throws SQLException { + Statement stat = conn.createStatement(); + stat.execute(JDBCTestBase.getInsertQuery()); + stat.close(); + } + + @AfterClass + public static void tearDownClass() { + cleanUpDerbyDatabases(); + } + + private static void cleanUpDerbyDatabases() { + try { + Class.forName(DRIVER_CLASS); + conn = DriverManager.getConnection(DB_URL + ";create=true"); + Statement stat = conn.createStatement(); + stat.executeUpdate("DROP TABLE "+INPUT_TABLE); + stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE); + stat.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java deleted file mode 100644 index 840a314..0000000 --- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io.jdbc.example; - -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.Statement; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; -import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; - -public class JDBCExample { - - public static void main(String[] args) throws Exception { - prepareTestDb(); - - ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); - DataSet source - = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("select * from books") - .finish(), - new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO) - ); - - source.output(JDBCOutputFormat.buildJDBCOutputFormat() - .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") - .setDBUrl("jdbc:derby:memory:ebookshop") - .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") - .finish()); - environment.execute(); - } - - private static void prepareTestDb() throws Exception { - System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL"); - String dbURL = "jdbc:derby:memory:ebookshop;create=true"; - Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); - Connection conn = DriverManager.getConnection(dbURL); - - StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - Statement stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); - sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); - sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); - sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); - sqlQueryBuilder.append("qty INT DEFAULT NULL,"); - sqlQueryBuilder.append("PRIMARY KEY (id))"); - - stat = conn.createStatement(); - stat.executeUpdate(sqlQueryBuilder.toString()); - stat.close(); - - sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); - sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); - sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); - sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); - sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); - sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); - - stat = conn.createStatement(); - stat.execute(sqlQueryBuilder.toString()); - stat.close(); - - conn.close(); - } -}