flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [FLINK-6281] [jdbc] Add JDBCAppendTableSink.
Date Fri, 11 Aug 2017 15:30:06 GMT
[FLINK-6281] [jdbc] Add JDBCAppendTableSink.

This closes #3712.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e5a81d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e5a81d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e5a81d

Branch: refs/heads/master
Commit: 43e5a81d4e95f4f7b239ab90f12dfb66e7ae8a48
Parents: 1de8acc
Author: Haohui Mai <wheat9@apache.org>
Authored: Tue Apr 11 23:56:56 2017 -0700
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Fri Aug 11 17:29:20 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  33 ++++-
 flink-connectors/flink-jdbc/pom.xml             |  10 +-
 .../api/java/io/jdbc/JDBCAppendTableSink.java   | 120 ++++++++++++++++
 .../io/jdbc/JDBCAppendTableSinkBuilder.java     | 140 +++++++++++++++++++
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  41 +++---
 .../api/java/io/jdbc/JDBCSinkFunction.java      |  63 +++++++++
 .../flink/api/java/io/jdbc/JDBCTypeUtil.java    | 103 ++++++++++++++
 .../java/io/jdbc/JDBCAppendTableSinkTest.java   |  90 ++++++++++++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  71 ++++++++--
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |   4 +
 .../api/java/io/jdbc/JDBCTypeUtilTest.java      |  52 +++++++
 11 files changed, 684 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7af74ca..53b93e1 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 -------------------
 
-**TODO**
+### JDBCAppendSink
+
+<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC
driver. The sink only supports append-only data. It does not support retractions and upserts
from Flink's perspectives. However, you can customize the query using <code>REPLACE</code>
or <code>INSERT OVERWRITE</code> to implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency (<code>flink-jdbc</code>)
to your project. Then you can create the sink using <code>JDBCAppendSinkBuilder</code>:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setParameterTypes(INT_TYPE_INFO)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setParameterTypes(INT_TYPE_INFO)
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify
the name of the JDBC driver, the JDBC URL, the query to be executed, and the field types of
the JDBC table. You can connect the sink with other <code>DataStream</code>s once
the sink is constructed.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml b/flink-connectors/flink-jdbc/pom.xml
index 0704dc8..938ec09 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -36,7 +36,6 @@ under the License.
 	<packaging>jar</packaging>
 
 	<dependencies>
-
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table_${scala.binary.version}</artifactId>
@@ -49,19 +48,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
 
 		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
 			<groupId>org.apache.derby</groupId>
 			<artifactId>derby</artifactId>
 			<version>10.10.1.1</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
new file mode 100644
index 0000000..fc2d0a8
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this
sink (if
+ * checkpointing is enabled). However, one common use case is to run idempotent queries
+ * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>)
to upsert into the database and
+ * achieve exactly-once semantic.</p>
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row>
{
+
+	private final JDBCOutputFormat outputFormat;
+
+	private String[] fieldNames;
+	private TypeInformation[] fieldTypes;
+
+	JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+		this.outputFormat = outputFormat;
+	}
+
+	public static JDBCAppendTableSinkBuilder builder() {
+		return new JDBCAppendTableSinkBuilder();
+	}
+
+	@Override
+	public void emitDataStream(DataStream<Row> dataStream) {
+		dataStream.addSink(new JDBCSinkFunction(outputFormat));
+	}
+
+	@Override
+	public void emitDataSet(DataSet<Row> dataSet) {
+		dataSet.output(outputFormat);
+	}
+
+	@Override
+	public TypeInformation<Row> getOutputType() {
+		return new RowTypeInfo(fieldTypes, fieldNames);
+	}
+
+	@Override
+	public String[] getFieldNames() {
+		return fieldNames;
+	}
+
+	@Override
+	public TypeInformation<?>[] getFieldTypes() {
+		return fieldTypes;
+	}
+
+	@Override
+	public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes)
{
+		int[] types = outputFormat.getTypesArray();
+
+		String sinkSchema =
+			String.join(", ", IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+		String tableSchema =
+			String.join(", ", Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+		String msg = String.format("Schema of output table is incompatible with JDBCAppendTableSink
schema. " +
+			"Table schema: [%s], sink schema: [%s]", tableSchema, sinkSchema);
+
+		Preconditions.checkArgument(fieldTypes.length == types.length, msg);
+		for (int i = 0; i < types.length; ++i) {
+			Preconditions.checkArgument(
+				JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
+				msg);
+		}
+
+		JDBCAppendTableSink copy;
+		try {
+			copy = new JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+
+		copy.fieldNames = fieldNames;
+		copy.fieldTypes = fieldTypes;
+		return copy;
+	}
+
+	@VisibleForTesting
+	JDBCOutputFormat getOutputFormat() {
+		return outputFormat;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
new file mode 100644
index 0000000..da00d74
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+	private String username;
+	private String password;
+	private String driverName;
+	private String dbURL;
+	private String query;
+	private int batchSize = DEFAULT_BATCH_INTERVAL;
+	private int[] parameterTypes;
+
+	/**
+	 * Specify the username of the JDBC connection.
+	 * @param username the username of the JDBC connection.
+	 */
+	public JDBCAppendTableSinkBuilder setUsername(String username) {
+		this.username = username;
+		return this;
+	}
+
+	/**
+	 * Specify the password of the JDBC connection.
+	 * @param password the password of the JDBC connection.
+	 */
+	public JDBCAppendTableSinkBuilder setPassword(String password) {
+		this.password = password;
+		return this;
+	}
+
+	/**
+	 * Specify the name of the JDBC driver that will be used.
+	 * @param drivername the name of the JDBC driver.
+	 */
+	public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+		this.driverName = drivername;
+		return this;
+	}
+
+	/**
+	 * Specify the URL of the JDBC database.
+	 * @param dbURL the URL of the database, whose format is specified by the
+	 *              corresponding JDBC driver.
+	 */
+	public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+		this.dbURL = dbURL;
+		return this;
+	}
+
+	/**
+	 * Specify the query that the sink will execute. Usually user can specify
+	 * INSERT, REPLACE or UPDATE to push the data to the database.
+	 * @param query The query to be executed by the sink.
+	 * @see org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String)
+	 */
+	public JDBCAppendTableSinkBuilder setQuery(String query) {
+		this.query = query;
+		return this;
+	}
+
+	/**
+	 * Specify the size of the batch. By default the sink will batch the query
+	 * to improve the performance
+	 * @param batchSize the size of batch
+	 */
+	public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+		this.batchSize = batchSize;
+		return this;
+	}
+
+	/**
+	 * Specify the type of the rows that the sink will be accepting.
+	 * @param types the type of each field
+	 */
+	public JDBCAppendTableSinkBuilder setParameterTypes(TypeInformation<?>... types) {
+		int[] ty = new int[types.length];
+		for (int i = 0; i < types.length; ++i) {
+			ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]);
+		}
+		this.parameterTypes = ty;
+		return this;
+	}
+
+	/**
+	 * Specify the type of the rows that the sink will be accepting.
+	 * @param types the type of each field defined by {@see java.sql.Types}.
+	 */
+	public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
+		this.parameterTypes = types;
+		return this;
+	}
+
+	/**
+	 * Finalizes the configuration and checks validity.
+	 *
+	 * @return Configured JDBCOutputFormat
+	 */
+	public JDBCAppendTableSink build() {
+		Preconditions.checkNotNull(parameterTypes,
+			"Types of the query parameters are not specified." +
+			" Please specify types using the setParameterTypes() method.");
+
+		JDBCOutputFormat format = JDBCOutputFormat.buildJDBCOutputFormat()
+			.setUsername(username)
+			.setPassword(password)
+			.setDBUrl(dbURL)
+			.setQuery(query)
+			.setDrivername(driverName)
+			.setBatchInterval(batchSize)
+			.setSqlTypes(parameterTypes)
+			.finish();
+
+		return new JDBCAppendTableSink(format);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 4cbdbf1..2497712 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -40,6 +40,7 @@ import java.sql.SQLException;
  */
 public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	private static final long serialVersionUID = 1L;
+	static final int DEFAULT_BATCH_INTERVAL = 5000;
 
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
 
@@ -48,7 +49,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	private String drivername;
 	private String dbURL;
 	private String query;
-	private int batchInterval = 5000;
+	private int batchInterval = DEFAULT_BATCH_INTERVAL;
 
 	private Connection dbConn;
 	private PreparedStatement upload;
@@ -206,15 +207,23 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 
 		if (batchCount >= batchInterval) {
 			// execute batch
-			try {
-				upload.executeBatch();
-				batchCount = 0;
-			} catch (SQLException e) {
-				throw new RuntimeException("Execution of JDBC statement failed.", e);
-			}
+			flush();
 		}
 	}
 
+	void flush() {
+		try {
+			upload.executeBatch();
+			batchCount = 0;
+		} catch (SQLException e) {
+			throw new RuntimeException("Execution of JDBC statement failed.", e);
+		}
+	}
+
+	int[] getTypesArray() {
+		return typesArray;
+	}
+
 	/**
 	 * Executes prepared statement and closes all resources of this instance.
 	 *
@@ -223,12 +232,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	@Override
 	public void close() throws IOException {
 		if (upload != null) {
-			// execute last batch
-			try {
-				upload.executeBatch();
-			} catch (SQLException e) {
-				throw new RuntimeException("Execution of JDBC statement failed.", e);
-			}
+			flush();
 			// close the connection
 			try {
 				upload.close();
@@ -238,7 +242,6 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 				upload = null;
 			}
 		}
-		batchCount = 0;
 
 		if (dbConn != null) {
 			try {
@@ -307,19 +310,19 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 		 */
 		public JDBCOutputFormat finish() {
 			if (format.username == null) {
-				LOG.info("Username was not supplied separately.");
+				LOG.info("Username was not supplied.");
 			}
 			if (format.password == null) {
-				LOG.info("Password was not supplied separately.");
+				LOG.info("Password was not supplied.");
 			}
 			if (format.dbURL == null) {
-				throw new IllegalArgumentException("No dababase URL supplied.");
+				throw new IllegalArgumentException("No database URL supplied.");
 			}
 			if (format.query == null) {
-				throw new IllegalArgumentException("No query suplied");
+				throw new IllegalArgumentException("No query supplied.");
 			}
 			if (format.drivername == null) {
-				throw new IllegalArgumentException("No driver supplied");
+				throw new IllegalArgumentException("No driver supplied.");
 			}
 
 			return format;

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
new file mode 100644
index 0000000..d2fdef6
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction
{
+	final JDBCOutputFormat outputFormat;
+
+	JDBCSinkFunction(JDBCOutputFormat outputFormat) {
+		this.outputFormat = outputFormat;
+	}
+
+	@Override
+	public void invoke(Row value) throws Exception {
+		outputFormat.writeRecord(value);
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		outputFormat.flush();
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		RuntimeContext ctx = getRuntimeContext();
+		outputFormat.setRuntimeContext(ctx);
+		outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+	}
+
+	@Override
+	public void close() throws Exception {
+		outputFormat.close();
+		super.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
new file mode 100644
index 0000000..c10631c
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+
+class JDBCTypeUtil {
+	private static final Map<TypeInformation<?>, Integer> TYPE_MAPPING;
+	private static final Map<Integer, String> SQL_TYPE_NAMES;
+
+	static {
+		HashMap<TypeInformation<?>, Integer> m = new HashMap<>();
+		m.put(STRING_TYPE_INFO, Types.VARCHAR);
+		m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
+		m.put(BYTE_TYPE_INFO, Types.TINYINT);
+		m.put(SHORT_TYPE_INFO, Types.SMALLINT);
+		m.put(INT_TYPE_INFO, Types.INTEGER);
+		m.put(LONG_TYPE_INFO, Types.BIGINT);
+		m.put(FLOAT_TYPE_INFO, Types.FLOAT);
+		m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
+		m.put(SqlTimeTypeInfo.DATE, Types.DATE);
+		m.put(SqlTimeTypeInfo.TIME, Types.TIME);
+		m.put(SqlTimeTypeInfo.TIMESTAMP, Types.TIMESTAMP);
+		m.put(BIG_DEC_TYPE_INFO, Types.DECIMAL);
+		m.put(BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.BINARY);
+		TYPE_MAPPING = Collections.unmodifiableMap(m);
+
+		HashMap<Integer, String> names = new HashMap<>();
+		names.put(Types.VARCHAR, "VARCHAR");
+		names.put(Types.BOOLEAN, "BOOLEAN");
+		names.put(Types.TINYINT, "TINYINT");
+		names.put(Types.SMALLINT, "SMALLINT");
+		names.put(Types.INTEGER, "INTEGER");
+		names.put(Types.BIGINT, "BIGINT");
+		names.put(Types.FLOAT, "FLOAT");
+		names.put(Types.DOUBLE, "DOUBLE");
+		names.put(Types.CHAR, "CHAR");
+		names.put(Types.DATE, "DATE");
+		names.put(Types.TIME, "TIME");
+		names.put(Types.TIMESTAMP, "TIMESTAMP");
+		names.put(Types.DECIMAL, "DECIMAL");
+		names.put(Types.BINARY, "BINARY");
+		SQL_TYPE_NAMES = Collections.unmodifiableMap(names);
+	}
+
+	private JDBCTypeUtil() {
+	}
+
+	static int typeInformationToSqlType(TypeInformation<?> type) {
+
+		if (TYPE_MAPPING.containsKey(type)) {
+			return TYPE_MAPPING.get(type);
+		} else if (type instanceof ObjectArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo)
{
+			return Types.ARRAY;
+		} else {
+			throw new IllegalArgumentException("Unsupported type: " + type);
+		}
+	}
+
+	static String getTypeName(int type) {
+		return SQL_TYPE_NAMES.get(type);
+	}
+
+	static String getTypeName(TypeInformation<?> type) {
+		return SQL_TYPE_NAMES.get(typeInformationToSqlType(type));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
new file mode 100644
index 0000000..95305c8
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkTest {
+	private static final String[] FIELD_NAMES = new String[]{"foo"};
+	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{
+		BasicTypeInfo.STRING_TYPE_INFO
+	};
+	private static final RowTypeInfo ROW_TYPE = new RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+	@Test
+	public void testAppendTableSink() throws IOException {
+		JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+			.setDrivername("foo")
+			.setDBUrl("bar")
+			.setQuery("insert into %s (id) values (?)")
+			.setParameterTypes(FIELD_TYPES)
+			.build();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Row> ds = env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
+		sink.emitDataStream(ds);
+
+		Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
+		assertEquals(1, sinkIds.size());
+		int sinkId = sinkIds.iterator().next();
+
+		StreamSink planSink = (StreamSink) env.getStreamGraph().getStreamNode(sinkId).getOperator();
+		assertTrue(planSink.getUserFunction() instanceof JDBCSinkFunction);
+
+		JDBCSinkFunction sinkFunction = (JDBCSinkFunction) planSink.getUserFunction();
+		assertSame(sink.getOutputFormat(), sinkFunction.outputFormat);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testTypeCompatibilityCheck() throws IOException {
+
+		JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+			.setDrivername("foo")
+			.setDBUrl("bar")
+			.setQuery("INSERT INTO foobar (id) VALUES (?)")
+			.setParameterTypes(Types.LONG, Types.STRING, Types.INT)
+			.build();
+
+		sink.configure(
+			new String[] {"Hello"},
+			new TypeInformation<?>[] {Types.STRING, Types.INT, Types.LONG});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index e6626a0..8582387 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.types.Row;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -33,6 +32,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 /**
  * Tests for the {@link JDBCOutputFormat}.
  */
@@ -170,13 +172,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 		jdbcOutputFormat.open(0, 1);
 
 		for (JDBCTestBase.TestEntry entry : TEST_DATA) {
-			Row row = new Row(5);
-			row.setField(0, entry.id);
-			row.setField(1, entry.title);
-			row.setField(2, entry.author);
-			row.setField(3, entry.price);
-			row.setField(4, entry.qty);
-			jdbcOutputFormat.writeRecord(row);
+			jdbcOutputFormat.writeRecord(toRow(entry));
 		}
 
 		jdbcOutputFormat.close();
@@ -188,15 +184,52 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 		) {
 			int recordCount = 0;
 			while (resultSet.next()) {
-				Assert.assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id"));
-				Assert.assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
-				Assert.assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author"));
-				Assert.assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
-				Assert.assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty"));
+				assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id"));
+				assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
+				assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author"));
+				assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
+				assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty"));
 
 				recordCount++;
 			}
-			Assert.assertEquals(TEST_DATA.length, recordCount);
+			assertEquals(TEST_DATA.length, recordCount);
+		}
+	}
+
+	@Test
+	public void testFlush() throws SQLException, IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE_2))
+			.setBatchInterval(3)
+			.finish();
+		try (
+			Connection dbConn = DriverManager.getConnection(DB_URL);
+			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS_2)
+		) {
+			jdbcOutputFormat.open(0, 1);
+			for (int i = 0; i < 2; ++i) {
+				jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i]));
+			}
+			try (ResultSet resultSet = statement.executeQuery()) {
+				assertFalse(resultSet.next());
+			}
+			jdbcOutputFormat.writeRecord(toRow(TEST_DATA[2]));
+			try (ResultSet resultSet = statement.executeQuery()) {
+				int recordCount = 0;
+				while (resultSet.next()) {
+					assertEquals(TEST_DATA[recordCount].id, resultSet.getObject("id"));
+					assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
+					assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author"));
+					assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
+					assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty"));
+					recordCount++;
+				}
+				assertEquals(3, recordCount);
+			}
+		} finally {
+			jdbcOutputFormat.close();
 		}
 	}
 
@@ -212,4 +245,14 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 			conn.close();
 		}
 	}
+
+	private static Row toRow(TestEntry entry) {
+		Row row = new Row(5);
+		row.setField(0, entry.id);
+		row.setField(1, entry.title);
+		row.setField(2, entry.author);
+		row.setField(3, entry.price);
+		row.setField(4, entry.qty);
+		return row;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 7189393..1d41d37 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -39,8 +39,10 @@ public class JDBCTestBase {
 	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
 	public static final String INPUT_TABLE = "books";
 	public static final String OUTPUT_TABLE = "newbooks";
+	public static final String OUTPUT_TABLE_2 = "newbooks2";
 	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
 	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+	public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + OUTPUT_TABLE_2;
 	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
 	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price,
qty) values (?,?,?,?,?)";
 	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = SELECT_ALL_BOOKS + " WHERE id
BETWEEN ? AND ?";
@@ -125,6 +127,7 @@ public class JDBCTestBase {
 		try (Connection conn = DriverManager.getConnection(DB_URL + ";create=true")) {
 			createTable(conn, JDBCTestBase.INPUT_TABLE);
 			createTable(conn, OUTPUT_TABLE);
+			createTable(conn, OUTPUT_TABLE_2);
 			insertDataIntoInputTable(conn);
 		}
 	}
@@ -150,6 +153,7 @@ public class JDBCTestBase {
 
 			stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
 			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
+			stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
new file mode 100644
index 0000000..790be78
--- /dev/null
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+
+import org.junit.Test;
+
+import java.sql.Types;
+
+import static org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.typeInformationToSqlType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Testing the type conversions from Flink to SQL types.
+ */
+public class JDBCTypeUtilTest {
+
+	@Test
+	public void testTypeConversions() {
+		assertEquals(Types.INTEGER, typeInformationToSqlType(BasicTypeInfo.INT_TYPE_INFO));
+		testUnsupportedType(BasicTypeInfo.VOID_TYPE_INFO);
+		testUnsupportedType(new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+	}
+
+	private static void testUnsupportedType(TypeInformation<?> type) {
+		try {
+			typeInformationToSqlType(type);
+			fail();
+		} catch (IllegalArgumentException ignored) {
+		}
+	}
+}


Mime
View raw message