flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-7221] [jdbc] Throw exception if execution of last JDBC batch fails.
Date Mon, 07 Aug 2017 16:10:37 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1ceb89a97 -> 8c9642f7d


[FLINK-7221] [jdbc] Throw exception if execution of last JDBC batch fails.

This closes #4459.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ea9dbb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ea9dbb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ea9dbb0

Branch: refs/heads/master
Commit: 6ea9dbb00d076754d13e05288aa13d7e946d6567
Parents: 1ceb89a
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Aug 1 18:08:53 2017 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Aug 7 16:23:49 2017 +0200

----------------------------------------------------------------------
 .../api/java/io/jdbc/JDBCOutputFormat.java      | 51 ++++++++------
 .../flink/api/java/io/jdbc/JDBCFullTest.java    | 15 +++++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  | 71 +++++++++++++++++++-
 3 files changed, 116 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9dbb0/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 3f2ad33..4cbdbf1 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import org.apache.flink.api.common.io.RichOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Row;
 
@@ -33,10 +32,10 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
 /**
- * OutputFormat to write tuples into a database.
+ * OutputFormat to write Rows into a JDBC database.
  * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
  *
- * @see Tuple
+ * @see Row
  * @see DriverManager
  */
 public class JDBCOutputFormat extends RichOutputFormat<Row> {
@@ -56,7 +55,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 
 	private int batchCount = 0;
 
-	public int[] typesArray;
+	private int[] typesArray;
 
 	public JDBCOutputFormat() {
 	}
@@ -201,12 +200,18 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 			}
 			upload.addBatch();
 			batchCount++;
-			if (batchCount >= batchInterval) {
+		} catch (SQLException e) {
+			throw new RuntimeException("Preparation of JDBC statement failed.", e);
+		}
+
+		if (batchCount >= batchInterval) {
+			// execute batch
+			try {
 				upload.executeBatch();
 				batchCount = 0;
+			} catch (SQLException e) {
+				throw new RuntimeException("Execution of JDBC statement failed.", e);
 			}
-		} catch (SQLException | IllegalArgumentException e) {
-			throw new IllegalArgumentException("writeRecord() failed", e);
 		}
 	}
 
@@ -217,26 +222,32 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	 */
 	@Override
 	public void close() throws IOException {
-		try {
-			if (upload != null) {
+		if (upload != null) {
+			// execute last batch
+			try {
 				upload.executeBatch();
+			} catch (SQLException e) {
+				throw new RuntimeException("Execution of JDBC statement failed.", e);
+			}
+			// close the connection
+			try {
 				upload.close();
+			} catch (SQLException e) {
+				LOG.info("JDBC statement could not be closed: " + e.getMessage());
+			} finally {
+				upload = null;
 			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} finally {
-			upload = null;
-			batchCount = 0;
 		}
+		batchCount = 0;
 
-		try {
-			if (dbConn != null) {
+		if (dbConn != null) {
+			try {
 				dbConn.close();
+			} catch (SQLException se) {
+				LOG.info("JDBC connection could not be closed: " + se.getMessage());
+			} finally {
+				dbConn = null;
 			}
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} finally {
-			dbConn = null;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9dbb0/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
index bd575c3..c1f2b25 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
 import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
 import org.apache.flink.types.Row;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,6 +32,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.Statement;
 import java.sql.Types;
 
 /**
@@ -92,4 +94,17 @@ public class JDBCFullTest extends JDBCTestBase {
 		}
 	}
 
+	@After
+	public void clearOutputTable() throws Exception {
+		Class.forName(DRIVER_CLASS);
+		try (
+			Connection conn = DriverManager.getConnection(DB_URL);
+			Statement stat = conn.createStatement()) {
+			stat.execute("DELETE FROM " + OUTPUT_TABLE);
+
+			stat.close();
+			conn.close();
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9dbb0/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 3f14504..e6626a0 100644
--- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -30,6 +30,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
 
 /**
  * Tests for the {@link JDBCOutputFormat}.
@@ -84,7 +86,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 				.finish();
 	}
 
-	@Test(expected = IllegalArgumentException.class)
+	@Test(expected = RuntimeException.class)
 	public void testIncompatibleTypes() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
 				.setDrivername(DRIVER_CLASS)
@@ -104,6 +106,60 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 		jdbcOutputFormat.close();
 	}
 
+	@Test(expected = RuntimeException.class)
+	public void testExceptionOnInvalidType() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+			.setSqlTypes(new int[] {
+				Types.INTEGER,
+				Types.VARCHAR,
+				Types.VARCHAR,
+				Types.DOUBLE,
+				Types.INTEGER})
+			.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		JDBCTestBase.TestEntry entry = TEST_DATA[0];
+		Row row = new Row(5);
+		row.setField(0, entry.id);
+		row.setField(1, entry.title);
+		row.setField(2, entry.author);
+		row.setField(3, 0L); // use incompatible type (Long instead of Double)
+		row.setField(4, entry.qty);
+		jdbcOutputFormat.writeRecord(row);
+	}
+
+	@Test(expected = RuntimeException.class)
+	public void testExceptionOnClose() throws IOException {
+
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+			.setDrivername(DRIVER_CLASS)
+			.setDBUrl(DB_URL)
+			.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
+			.setSqlTypes(new int[] {
+				Types.INTEGER,
+				Types.VARCHAR,
+				Types.VARCHAR,
+				Types.DOUBLE,
+				Types.INTEGER})
+			.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		JDBCTestBase.TestEntry entry = TEST_DATA[0];
+		Row row = new Row(5);
+		row.setField(0, entry.id);
+		row.setField(1, entry.title);
+		row.setField(2, entry.author);
+		row.setField(3, entry.price);
+		row.setField(4, entry.qty);
+		jdbcOutputFormat.writeRecord(row);
+		jdbcOutputFormat.writeRecord(row); // writing the same record twice must yield a unique
key violation.
+
+		jdbcOutputFormat.close();
+	}
+
 	@Test
 	public void testJDBCOutputFormat() throws IOException, SQLException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
@@ -143,4 +199,17 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
 			Assert.assertEquals(TEST_DATA.length, recordCount);
 		}
 	}
+
+	@After
+	public void clearOutputTable() throws Exception {
+		Class.forName(DRIVER_CLASS);
+		try (
+			Connection conn = DriverManager.getConnection(DB_URL);
+			Statement stat = conn.createStatement()) {
+			stat.execute("DELETE FROM " + OUTPUT_TABLE);
+
+			stat.close();
+			conn.close();
+		}
+	}
 }


Mime
View raw message