Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91E9320049C for ; Fri, 11 Aug 2017 17:30:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 904A716D67A; Fri, 11 Aug 2017 15:30:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1155916D678 for ; Fri, 11 Aug 2017 17:30:06 +0200 (CEST) Received: (qmail 84666 invoked by uid 500); 11 Aug 2017 15:30:06 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 84657 invoked by uid 99); 11 Aug 2017 15:30:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Aug 2017 15:30:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 663B5F16BB; Fri, 11 Aug 2017 15:30:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Fri, 11 Aug 2017 15:30:06 -0000 Message-Id: <869afff7a8454e84af4fda3659aa3624@git.apache.org> In-Reply-To: <524c6943394d4831a2e6b6ee6a3dbf92@git.apache.org> References: <524c6943394d4831a2e6b6ee6a3dbf92@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-6281] [jdbc] Add JDBCAppendTableSink. archived-at: Fri, 11 Aug 2017 15:30:08 -0000 [FLINK-6281] [jdbc] Add JDBCAppendTableSink. This closes #3712. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e5a81d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e5a81d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e5a81d Branch: refs/heads/master Commit: 43e5a81d4e95f4f7b239ab90f12dfb66e7ae8a48 Parents: 1de8acc Author: Haohui Mai Authored: Tue Apr 11 23:56:56 2017 -0700 Committer: Fabian Hueske Committed: Fri Aug 11 17:29:20 2017 +0200 ---------------------------------------------------------------------- docs/dev/table/sourceSinks.md | 33 ++++- flink-connectors/flink-jdbc/pom.xml | 10 +- .../api/java/io/jdbc/JDBCAppendTableSink.java | 120 ++++++++++++++++ .../io/jdbc/JDBCAppendTableSinkBuilder.java | 140 +++++++++++++++++++ .../api/java/io/jdbc/JDBCOutputFormat.java | 41 +++--- .../api/java/io/jdbc/JDBCSinkFunction.java | 63 +++++++++ .../flink/api/java/io/jdbc/JDBCTypeUtil.java | 103 ++++++++++++++ .../java/io/jdbc/JDBCAppendTableSinkTest.java | 90 ++++++++++++ .../api/java/io/jdbc/JDBCOutputFormatTest.java | 71 ++++++++-- .../flink/api/java/io/jdbc/JDBCTestBase.java | 4 + .../api/java/io/jdbc/JDBCTypeUtilTest.java | 52 +++++++ 11 files changed, 684 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/docs/dev/table/sourceSinks.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index 7af74ca..53b93e1 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource Provided TableSinks ------------------- -**TODO** +### JDBCAppendSink + +JDBCAppendSink allows you to bridge the data stream to the JDBC driver. The sink only supports append-only data. It does not support retractions and upserts from Flink's perspectives. However, you can customize the query using REPLACE or INSERT OVERWRITE to implement upsert inside the database. + +To use the JDBC sink, you have to add the JDBC connector dependency (flink-jdbc) to your project. Then you can create the sink using JDBCAppendSinkBuilder: + +
+
+{% highlight java %} + +JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES (?)") + .setParameterTypes(INT_TYPE_INFO) + .build(); +{% endhighlight %} +
+ +
+{% highlight scala %} +val sink = JDBCAppendTableSink.builder() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("INSERT INTO books (id) VALUES (?)") + .setParameterTypes(INT_TYPE_INFO) + .build() +{% endhighlight %} +
+
+ +Similar to using JDBCOutputFormat, you have to explicitly specify the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of the JDBC table. You can connect the sink with other DataStreams once the sink is constructed. {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml index 0704dc8..938ec09 100644 --- a/flink-connectors/flink-jdbc/pom.xml +++ b/flink-connectors/flink-jdbc/pom.xml @@ -36,7 +36,6 @@ under the License. jar - org.apache.flink flink-table_${scala.binary.version} @@ -49,19 +48,12 @@ under the License. org.apache.flink - flink-java + flink-streaming-java_${scala.binary.version} ${project.version} provided - org.apache.flink - flink-clients_${scala.binary.version} - ${project.version} - test - - - org.apache.derby derby 10.10.1.1 http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java new file mode 100644 index 0000000..fc2d0a8 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.BatchTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * An at-least-once Table sink for JDBC. + * + *

The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if + * checkpointing is enabled). However, one common use case is to run idempotent queries + * (e.g., REPLACE or INSERT OVERWRITE) to upsert into the database and + * achieve exactly-once semantic.

+ */ +public class JDBCAppendTableSink implements AppendStreamTableSink, BatchTableSink { + + private final JDBCOutputFormat outputFormat; + + private String[] fieldNames; + private TypeInformation[] fieldTypes; + + JDBCAppendTableSink(JDBCOutputFormat outputFormat) { + this.outputFormat = outputFormat; + } + + public static JDBCAppendTableSinkBuilder builder() { + return new JDBCAppendTableSinkBuilder(); + } + + @Override + public void emitDataStream(DataStream dataStream) { + dataStream.addSink(new JDBCSinkFunction(outputFormat)); + } + + @Override + public void emitDataSet(DataSet dataSet) { + dataSet.output(outputFormat); + } + + @Override + public TypeInformation getOutputType() { + return new RowTypeInfo(fieldTypes, fieldNames); + } + + @Override + public String[] getFieldNames() { + return fieldNames; + } + + @Override + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + int[] types = outputFormat.getTypesArray(); + + String sinkSchema = + String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); + String tableSchema = + String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList())); + String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink schema. " + + "Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema); + + Preconditions.checkArgument(fieldTypes.length == types.length, msg); + for (int i = 0; i < types.length; ++i) { + Preconditions.checkArgument( + JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i], + msg); + } + + JDBCAppendTableSink copy; + try { + copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat)); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + + copy.fieldNames = fieldNames; + copy.fieldTypes = fieldTypes; + return copy; + } + + @VisibleForTesting + JDBCOutputFormat getOutputFormat() { + return outputFormat; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java new file mode 100644 index 0000000..da00d74 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; + +/** + * A builder to configure and build the JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkBuilder { + private String username; + private String password; + private String driverName; + private String dbURL; + private String query; + private int batchSize = DEFAULT_BATCH_INTERVAL; + private int[] parameterTypes; + + /** + * Specify the username of the JDBC connection. + * @param username the username of the JDBC connection. + */ + public JDBCAppendTableSinkBuilder setUsername(String username) { + this.username = username; + return this; + } + + /** + * Specify the password of the JDBC connection. + * @param password the password of the JDBC connection. + */ + public JDBCAppendTableSinkBuilder setPassword(String password) { + this.password = password; + return this; + } + + /** + * Specify the name of the JDBC driver that will be used. + * @param drivername the name of the JDBC driver. + */ + public JDBCAppendTableSinkBuilder setDrivername(String drivername) { + this.driverName = drivername; + return this; + } + + /** + * Specify the URL of the JDBC database. + * @param dbURL the URL of the database, whose format is specified by the + * corresponding JDBC driver. + */ + public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) { + this.dbURL = dbURL; + return this; + } + + /** + * Specify the query that the sink will execute. Usually user can specify + * INSERT, REPLACE or UPDATE to push the data to the database. + * @param query The query to be executed by the sink. + * @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String) + */ + public JDBCAppendTableSinkBuilder setQuery(String query) { + this.query = query; + return this; + } + + /** + * Specify the size of the batch. By default the sink will batch the query + * to improve the performance + * @param batchSize the size of batch + */ + public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * Specify the type of the rows that the sink will be accepting. + * @param types the type of each field + */ + public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation... types) { + int[] ty = new int[types.length]; + for (int i = 0; i < types.length; ++i) { + ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]); + } + this.parameterTypes = ty; + return this; + } + + /** + * Specify the type of the rows that the sink will be accepting. + * @param types the type of each field defined by {@see java.sql.Types}. + */ + public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { + this.parameterTypes = types; + return this; + } + + /** + * Finalizes the configuration and checks validity. + * + * @return Configured JDBCOutputFormat + */ + public JDBCAppendTableSink build() { + Preconditions.checkNotNull(parameterTypes, + "Types of the query parameters are not specified." + + " Please specify types using the setParameterTypes() method."); + + JDBCOutputFormat format = JDBCOutputFormat.buildJDBCOutputFormat() + .setUsername(username) + .setPassword(password) + .setDBUrl(dbURL) + .setQuery(query) + .setDrivername(driverName) + .setBatchInterval(batchSize) + .setSqlTypes(parameterTypes) + .finish(); + + return new JDBCAppendTableSink(format); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index 4cbdbf1..2497712 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -40,6 +40,7 @@ import java.sql.SQLException; */ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; + static final int DEFAULT_BATCH_INTERVAL = 5000; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); @@ -48,7 +49,7 @@ public class JDBCOutputFormat extends RichOutputFormat { private String drivername; private String dbURL; private String query; - private int batchInterval = 5000; + private int batchInterval = DEFAULT_BATCH_INTERVAL; private Connection dbConn; private PreparedStatement upload; @@ -206,15 +207,23 @@ public class JDBCOutputFormat extends RichOutputFormat { if (batchCount >= batchInterval) { // execute batch - try { - upload.executeBatch(); - batchCount = 0; - } catch (SQLException e) { - throw new RuntimeException("Execution of JDBC statement failed.", e); - } + flush(); } } + void flush() { + try { + upload.executeBatch(); + batchCount = 0; + } catch (SQLException e) { + throw new RuntimeException("Execution of JDBC statement failed.", e); + } + } + + int[] getTypesArray() { + return typesArray; + } + /** * Executes prepared statement and closes all resources of this instance. * @@ -223,12 +232,7 @@ public class JDBCOutputFormat extends RichOutputFormat { @Override public void close() throws IOException { if (upload != null) { - // execute last batch - try { - upload.executeBatch(); - } catch (SQLException e) { - throw new RuntimeException("Execution of JDBC statement failed.", e); - } + flush(); // close the connection try { upload.close(); @@ -238,7 +242,6 @@ public class JDBCOutputFormat extends RichOutputFormat { upload = null; } } - batchCount = 0; if (dbConn != null) { try { @@ -307,19 +310,19 @@ public class JDBCOutputFormat extends RichOutputFormat { */ public JDBCOutputFormat finish() { if (format.username == null) { - LOG.info("Username was not supplied separately."); + LOG.info("Username was not supplied."); } if (format.password == null) { - LOG.info("Password was not supplied separately."); + LOG.info("Password was not supplied."); } if (format.dbURL == null) { - throw new IllegalArgumentException("No dababase URL supplied."); + throw new IllegalArgumentException("No database URL supplied."); } if (format.query == null) { - throw new IllegalArgumentException("No query suplied"); + throw new IllegalArgumentException("No query supplied."); } if (format.drivername == null) { - throw new IllegalArgumentException("No driver supplied"); + throw new IllegalArgumentException("No driver supplied."); } return format; http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java new file mode 100644 index 0000000..d2fdef6 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.types.Row; + +class JDBCSinkFunction extends RichSinkFunction implements CheckpointedFunction { + final JDBCOutputFormat outputFormat; + + JDBCSinkFunction(JDBCOutputFormat outputFormat) { + this.outputFormat = outputFormat; + } + + @Override + public void invoke(Row value) throws Exception { + outputFormat.writeRecord(value); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + outputFormat.flush(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext ctx = getRuntimeContext(); + outputFormat.setRuntimeContext(ctx); + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + @Override + public void close() throws Exception { + outputFormat.close(); + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java new file mode 100644 index 0000000..c10631c --- /dev/null +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; + +import java.sql.Types; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + +class JDBCTypeUtil { + private static final Map, Integer> TYPE_MAPPING; + private static final Map SQL_TYPE_NAMES; + + static { + HashMap, Integer> m = new HashMap<>(); + m.put(STRING_TYPE_INFO, Types.VARCHAR); + m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN); + m.put(BYTE_TYPE_INFO, Types.TINYINT); + m.put(SHORT_TYPE_INFO, Types.SMALLINT); + m.put(INT_TYPE_INFO, Types.INTEGER); + m.put(LONG_TYPE_INFO, Types.BIGINT); + m.put(FLOAT_TYPE_INFO, Types.FLOAT); + m.put(DOUBLE_TYPE_INFO, Types.DOUBLE); + m.put(SqlTimeTypeInfo.DATE, Types.DATE); + m.put(SqlTimeTypeInfo.TIME, Types.TIME); + m.put(SqlTimeTypeInfo.TIMESTAMP, Types.TIMESTAMP); + m.put(BIG_DEC_TYPE_INFO, Types.DECIMAL); + m.put(BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.BINARY); + TYPE_MAPPING = Collections.unmodifiableMap(m); + + HashMap names = new HashMap<>(); + names.put(Types.VARCHAR, "VARCHAR"); + names.put(Types.BOOLEAN, "BOOLEAN"); + names.put(Types.TINYINT, "TINYINT"); + names.put(Types.SMALLINT, "SMALLINT"); + names.put(Types.INTEGER, "INTEGER"); + names.put(Types.BIGINT, "BIGINT"); + names.put(Types.FLOAT, "FLOAT"); + names.put(Types.DOUBLE, "DOUBLE"); + names.put(Types.CHAR, "CHAR"); + names.put(Types.DATE, "DATE"); + names.put(Types.TIME, "TIME"); + names.put(Types.TIMESTAMP, "TIMESTAMP"); + names.put(Types.DECIMAL, "DECIMAL"); + names.put(Types.BINARY, "BINARY"); + SQL_TYPE_NAMES = Collections.unmodifiableMap(names); + } + + private JDBCTypeUtil() { + } + + static int typeInformationToSqlType(TypeInformation type) { + + if (TYPE_MAPPING.containsKey(type)) { + return TYPE_MAPPING.get(type); + } else if (type instanceof ObjectArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo) { + return Types.ARRAY; + } else { + throw new IllegalArgumentException("Unsupported type: " + type); + } + } + + static String getTypeName(int type) { + return SQL_TYPE_NAMES.get(type); + } + + static String getTypeName(TypeInformation type) { + return SQL_TYPE_NAMES.get(typeInformationToSqlType(type)); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java new file mode 100644 index 0000000..95305c8 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Test for JDBCAppendTableSink. + */ +public class JDBCAppendTableSinkTest { + private static final String[] FIELD_NAMES = new String[]{"foo"}; + private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO + }; + private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES); + + @Test + public void testAppendTableSink() throws IOException { + JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("foo") + .setDBUrl("bar") + .setQuery("insert into %s (id) values (?)") + .setParameterTypes(FIELD_TYPES) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE); + sink.emitDataStream(ds); + + Collection sinkIds = env.getStreamGraph().getSinkIDs(); + assertEquals(1, sinkIds.size()); + int sinkId = sinkIds.iterator().next(); + + StreamSink planSink = (StreamSink) env.getStreamGraph().getStreamNode(sinkId).getOperator(); + assertTrue(planSink.getUserFunction() instanceof JDBCSinkFunction); + + JDBCSinkFunction sinkFunction = (JDBCSinkFunction) planSink.getUserFunction(); + assertSame(sink.getOutputFormat(), sinkFunction.outputFormat); + } + + @Test(expected = IllegalArgumentException.class) + public void testTypeCompatibilityCheck() throws IOException { + + JDBCAppendTableSink sink = JDBCAppendTableSink.builder() + .setDrivername("foo") + .setDBUrl("bar") + .setQuery("INSERT INTO foobar (id) VALUES (?)") + .setParameterTypes(Types.LONG, Types.STRING, Types.INT) + .build(); + + sink.configure( + new String[] {"Hello"}, + new TypeInformation[] {Types.STRING, Types.INT, Types.LONG}); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java index e6626a0..8582387 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -21,7 +21,6 @@ package org.apache.flink.api.java.io.jdbc; import org.apache.flink.types.Row; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -33,6 +32,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + /** * Tests for the {@link JDBCOutputFormat}. */ @@ -170,13 +172,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase { jdbcOutputFormat.open(0, 1); for (JDBCTestBase.TestEntry entry : TEST_DATA) { - Row row = new Row(5); - row.setField(0, entry.id); - row.setField(1, entry.title); - row.setField(2, entry.author); - row.setField(3, entry.price); - row.setField(4, entry.qty); - jdbcOutputFormat.writeRecord(row); + jdbcOutputFormat.writeRecord(toRow(entry)); } jdbcOutputFormat.close(); @@ -188,15 +184,52 @@ public class JDBCOutputFormatTest extends JDBCTestBase { ) { int recordCount = 0; while (resultSet.next()) { - Assert.assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id")); - Assert.assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title")); - Assert.assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author")); - Assert.assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price")); - Assert.assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty")); + assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id")); + assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title")); + assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author")); + assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price")); + assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty")); recordCount++; } - Assert.assertEquals(TEST_DATA.length, recordCount); + assertEquals(TEST_DATA.length, recordCount); + } + } + + @Test + public void testFlush() throws SQLException, IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername(DRIVER_CLASS) + .setDBUrl(DB_URL) + .setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_2)) + .setBatchInterval(3) + .finish(); + try ( + Connection dbConn = DriverManager.getConnection(DB_URL); + PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS_2) + ) { + jdbcOutputFormat.open(0, 1); + for (int i = 0; i < 2; ++i) { + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i])); + } + try (ResultSet resultSet = statement.executeQuery()) { + assertFalse(resultSet.next()); + } + jdbcOutputFormat.writeRecord(toRow(TEST_DATA[2])); + try (ResultSet resultSet = statement.executeQuery()) { + int recordCount = 0; + while (resultSet.next()) { + assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id")); + assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title")); + assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author")); + assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price")); + assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty")); + recordCount++; + } + assertEquals(3, recordCount); + } + } finally { + jdbcOutputFormat.close(); } } @@ -212,4 +245,14 @@ public class JDBCOutputFormatTest extends JDBCTestBase { conn.close(); } } + + private static Row toRow(TestEntry entry) { + Row row = new Row(5); + row.setField(0, entry.id); + row.setField(1, entry.title); + row.setField(2, entry.author); + row.setField(3, entry.price); + row.setField(4, entry.qty); + return row; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java index 7189393..1d41d37 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java @@ -39,8 +39,10 @@ public class JDBCTestBase { public static final String DB_URL = "jdbc:derby:memory:ebookshop"; public static final String INPUT_TABLE = "books"; public static final String OUTPUT_TABLE = "newbooks"; + public static final String OUTPUT_TABLE_2 = "newbooks2"; public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE; public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE; + public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2; public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0"; public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)"; public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?"; @@ -125,6 +127,7 @@ public class JDBCTestBase { try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) { createTable(conn, JDBCTestBase.INPUT_TABLE); createTable(conn, OUTPUT_TABLE); + createTable(conn, OUTPUT_TABLE_2); insertDataIntoInputTable(conn); } } @@ -150,6 +153,7 @@ public class JDBCTestBase { stat.executeUpdate("DROP TABLE " + INPUT_TABLE); stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE); + stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java new file mode 100644 index 0000000..790be78 --- /dev/null +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; + +import org.junit.Test; + +import java.sql.Types; + +import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.typeInformationToSqlType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Testing the type conversions from Flink to SQL types. + */ +public class JDBCTypeUtilTest { + + @Test + public void testTypeConversions() { + assertEquals(Types.INTEGER, typeInformationToSqlType(BasicTypeInfo.INT_TYPE_INFO)); + testUnsupportedType(BasicTypeInfo.VOID_TYPE_INFO); + testUnsupportedType(new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)); + } + + private static void testUnsupportedType(TypeInformation type) { + try { + typeInformationToSqlType(type); + fail(); + } catch (IllegalArgumentException ignored) { + } + } +}