flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/3] flink git commit: [FLINK-3750] [jdbcConnector] Refactor JdbcInputFormat and JdbcOutputFormat.
Date Wed, 18 May 2016 22:15:52 GMT
[FLINK-3750] [jdbcConnector] Refactor JdbcInputFormat and JdbcOutputFormat.

- New Input- and OutputFormat use Row instead of Tuple types to support null values.
- JdbcInputFormat supports parallel input due to PreparedStatement and binding values for parameters.

This closes #1941


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09b428bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09b428bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09b428bd

Branch: refs/heads/master
Commit: 09b428bd65819b946cf82ab1fdee305eb5a941f5
Parents: a24df98
Author: Flavio Pompermaier <f.pompermaier@gmail.com>
Authored: Tue Apr 26 19:10:53 2016 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed May 18 22:03:58 2016 +0200

----------------------------------------------------------------------
 flink-batch-connectors/flink-jdbc/pom.xml       |   6 +
 .../flink/api/java/io/jdbc/JDBCInputFormat.java | 392 ++++++++++---------
 .../api/java/io/jdbc/JDBCOutputFormat.java      | 255 +++++++-----
 .../split/GenericParameterValuesProvider.java   |  44 +++
 .../split/NumericBetweenParametersProvider.java |  66 ++++
 .../io/jdbc/split/ParameterValuesProvider.java  |  35 ++
 .../flink/api/java/io/jdbc/DerbyUtil.java       |  31 --
 .../flink/api/java/io/jdbc/JDBCFullTest.java    | 101 +++++
 .../api/java/io/jdbc/JDBCInputFormatTest.java   | 297 ++++++++------
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  | 222 ++++-------
 .../flink/api/java/io/jdbc/JDBCTestBase.java    | 183 +++++++++
 .../api/java/io/jdbc/example/JDBCExample.java   | 102 -----
 12 files changed, 1042 insertions(+), 692 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/pom.xml b/flink-batch-connectors/flink-jdbc/pom.xml
index efa3d2f..cb169ea 100644
--- a/flink-batch-connectors/flink-jdbc/pom.xml
+++ b/flink-batch-connectors/flink-jdbc/pom.xml
@@ -38,6 +38,12 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index b764350..b4246f5 100644
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -19,114 +19,236 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.sql.Statement;
-
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.types.NullValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * InputFormat to read data from a database and generate tuples.
+ * InputFormat to read data from a database and generate Rows.
  * The InputFormat has to be configured using the supplied InputFormatBuilder.
- * 
- * @param <OUT>
- * @see Tuple
+ * A valid RowTypeInfo must be properly configured in the builder, e.g.: </br>
+ *
+ * <pre><code>
+ * TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+ *		BasicTypeInfo.INT_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.STRING_TYPE_INFO,
+ *		BasicTypeInfo.DOUBLE_TYPE_INFO,
+ *		BasicTypeInfo.INT_TYPE_INFO
+ *	};
+ *
+ * RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.finish();
+ * </code></pre>
+ *
+ * In order to query the JDBC source in parallel, you need to provide a
+ * parameterized query template (i.e. a valid {@link PreparedStatement}) and
+ * a {@link ParameterValuesProvider} which provides binding values for the
+ * query parameters. E.g.:</br>
+ *
+ * <pre><code>
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+ *				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+ *				.setDBUrl("jdbc:derby:memory:ebookshop")
+ *				.setQuery("select * from books WHERE author = ?")
+ *				.setRowTypeInfo(rowTypeInfo)
+ *				.setParametersProvider(new GenericParameterValuesProvider(queryParameters))
+ *				.finish();
+ * </code></pre>
+ *
+ * @see Row
+ * @see ParameterValuesProvider
+ * @see PreparedStatement
  * @see DriverManager
  */
-public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, InputSplit> implements NonParallelInput {
-	private static final long serialVersionUID = 1L;
+public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
 
+	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);
 
 	private String username;
 	private String password;
 	private String drivername;
 	private String dbURL;
-	private String query;
+	private String queryTemplate;
 	private int resultSetType;
 	private int resultSetConcurrency;
+	private RowTypeInfo rowTypeInfo;
 
 	private transient Connection dbConn;
-	private transient Statement statement;
+	private transient PreparedStatement statement;
 	private transient ResultSet resultSet;
 
-	private int[] columnTypes = null;
+	private boolean hasNext;
+	private Object[][] parameterValues;
 
 	public JDBCInputFormat() {
 	}
 
 	@Override
+	public RowTypeInfo getProducedType() {
+		return rowTypeInfo;
+	}
+
+	@Override
 	public void configure(Configuration parameters) {
+		//do nothing here
 	}
 
-	/**
-	 * Connects to the source database and executes the query.
-	 *
-	 * @param ignored
-	 * @throws IOException
-	 */
 	@Override
-	public void open(InputSplit ignored) throws IOException {
+	public void openInputFormat() {
+		//called once per inputFormat (on open)
 		try {
-			establishConnection();
-			statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
-			resultSet = statement.executeQuery(query);
+			Class.forName(drivername);
+			if (username == null) {
+				dbConn = DriverManager.getConnection(dbURL);
+			} else {
+				dbConn = DriverManager.getConnection(dbURL, username, password);
+			}
+			statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
 		} catch (SQLException se) {
-			close();
 			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
 		} catch (ClassNotFoundException cnfe) {
 			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
 		}
 	}
 
-	private void establishConnection() throws SQLException, ClassNotFoundException {
-		Class.forName(drivername);
-		if (username == null) {
-			dbConn = DriverManager.getConnection(dbURL);
-		} else {
-			dbConn = DriverManager.getConnection(dbURL, username, password);
+	@Override
+	public void closeInputFormat() {
+		//called once per inputFormat (on close)
+		try {
+			if(statement != null) {
+				statement.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage());
+		} finally {
+			statement = null;
+		}
+
+		try {
+			if(dbConn != null) {
+				dbConn.close();
+			}
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} finally {
+			dbConn = null;
 		}
+
+		parameterValues = null;
 	}
 
 	/**
-	 * Closes all resources used.
+	 * Connects to the source database and executes the query in a <b>parallel
+	 * fashion</b> if
+	 * this {@link InputFormat} is built using a parameterized query (i.e. using
+	 * a {@link PreparedStatement})
+	 * and a proper {@link ParameterValuesProvider}, in a <b>non-parallel
+	 * fashion</b> otherwise.
 	 *
-	 * @throws IOException Indicates that a resource could not be closed.
+	 * @param inputSplit which is ignored if this InputFormat is executed as a
+	 *        non-parallel source,
+	 *        a "hook" to the query parameters otherwise (using its
+	 *        <i>splitNumber</i>)
+	 * @throws IOException if there's an error during the execution of the query
 	 */
 	@Override
-	public void close() throws IOException {
+	public void open(InputSplit inputSplit) throws IOException {
 		try {
-			resultSet.close();
+			if (inputSplit != null && parameterValues != null) {
+				for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
+					Object param = parameterValues[inputSplit.getSplitNumber()][i];
+					if (param instanceof String) {
+						statement.setString(i + 1, (String) param);
+					} else if (param instanceof Long) {
+						statement.setLong(i + 1, (Long) param);
+					} else if (param instanceof Integer) {
+						statement.setInt(i + 1, (Integer) param);
+					} else if (param instanceof Double) {
+						statement.setDouble(i + 1, (Double) param);
+					} else if (param instanceof Boolean) {
+						statement.setBoolean(i + 1, (Boolean) param);
+					} else if (param instanceof Float) {
+						statement.setFloat(i + 1, (Float) param);
+					} else if (param instanceof BigDecimal) {
+						statement.setBigDecimal(i + 1, (BigDecimal) param);
+					} else if (param instanceof Byte) {
+						statement.setByte(i + 1, (Byte) param);
+					} else if (param instanceof Short) {
+						statement.setShort(i + 1, (Short) param);
+					} else if (param instanceof Date) {
+						statement.setDate(i + 1, (Date) param);
+					} else if (param instanceof Time) {
+						statement.setTime(i + 1, (Time) param);
+					} else if (param instanceof Timestamp) {
+						statement.setTimestamp(i + 1, (Timestamp) param);
+					} else if (param instanceof Array) {
+						statement.setArray(i + 1, (Array) param);
+					} else {
+						//extends with other types if needed
+						throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)." );
+					}
+				}
+				if (LOG.isDebugEnabled()) {
+					LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
+				}
+			}
+			resultSet = statement.executeQuery();
+			hasNext = resultSet.next();
 		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
 		}
-		try {
-			statement.close();
-		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
+	}
+
+	/**
+	 * Closes all resources used.
+	 *
+	 * @throws IOException Indicates that a resource could not be closed.
+	 */
+	@Override
+	public void close() throws IOException {
+		if(resultSet == null) {
+			return;
 		}
 		try {
-			dbConn.close();
+			resultSet.close();
 		} catch (SQLException se) {
-			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
+			LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage());
 		}
 	}
 
@@ -138,150 +260,35 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 	 */
 	@Override
 	public boolean reachedEnd() throws IOException {
-		try {
-			if (resultSet.isLast()) {
-				close();
-				return true;
-			}
-			return false;
-		} catch (SQLException se) {
-			throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se);
-		}
+		return !hasNext;
 	}
 
 	/**
 	 * Stores the next resultSet row in a tuple
 	 *
-	 * @param tuple
-	 * @return tuple containing next row
+	 * @param row row to be reused.
+	 * @return row containing next {@link Row}
 	 * @throws java.io.IOException
 	 */
 	@Override
-	public OUT nextRecord(OUT tuple) throws IOException {
+	public Row nextRecord(Row row) throws IOException {
 		try {
-			resultSet.next();
-			if (columnTypes == null) {
-				extractTypes(tuple);
+			if (!hasNext) {
+				return null;
 			}
-			addValue(tuple);
-			return tuple;
+			for (int pos = 0; pos < row.productArity(); pos++) {
+				row.setField(pos, resultSet.getObject(pos + 1));
+			}
+			//update hasNext after we've read the record
+			hasNext = resultSet.next();
+			return row;
 		} catch (SQLException se) {
-			close();
 			throw new IOException("Couldn't read data - " + se.getMessage(), se);
 		} catch (NullPointerException npe) {
-			close();
 			throw new IOException("Couldn't access resultSet", npe);
 		}
 	}
 
-	private void extractTypes(OUT tuple) throws SQLException, IOException {
-		ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-		columnTypes = new int[resultSetMetaData.getColumnCount()];
-		if (tuple.getArity() != columnTypes.length) {
-			close();
-			throw new IOException("Tuple size does not match columncount");
-		}
-		for (int pos = 0; pos < columnTypes.length; pos++) {
-			columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1);
-		}
-	}
-
-	/**
-	 * Enters data value from the current resultSet into a Record.
-	 *
-	 * @param reuse Target Record.
-	 */
-	private void addValue(OUT reuse) throws IOException, SQLException {
-		for (int pos = 0; pos < columnTypes.length; pos++) {
-			try {
-				switch (columnTypes[pos]) {
-					case java.sql.Types.NULL:
-						reuse.setField(NullValue.getInstance(), pos);
-						break;
-					case java.sql.Types.BOOLEAN:
-						reuse.setField(resultSet.getBoolean(pos + 1), pos);
-						break;
-					case java.sql.Types.BIT:
-						reuse.setField(resultSet.getBoolean(pos + 1), pos);
-						break;
-					case java.sql.Types.CHAR:
-						reuse.setField(resultSet.getString(pos + 1), pos);
-						break;
-					case java.sql.Types.NCHAR:
-						reuse.setField(resultSet.getString(pos + 1), pos);
-						break;
-					case java.sql.Types.VARCHAR:
-						reuse.setField(resultSet.getString(pos + 1), pos);
-						break;
-					case java.sql.Types.LONGVARCHAR:
-						reuse.setField(resultSet.getString(pos + 1), pos);
-						break;
-					case java.sql.Types.LONGNVARCHAR:
-						reuse.setField(resultSet.getString(pos + 1), pos);
-						break;
-					case java.sql.Types.TINYINT:
-						reuse.setField(resultSet.getShort(pos + 1), pos);
-						break;
-					case java.sql.Types.SMALLINT:
-						reuse.setField(resultSet.getShort(pos + 1), pos);
-						break;
-					case java.sql.Types.BIGINT:
-						reuse.setField(resultSet.getLong(pos + 1), pos);
-						break;
-					case java.sql.Types.INTEGER:
-						reuse.setField(resultSet.getInt(pos + 1), pos);
-						break;
-					case java.sql.Types.FLOAT:
-						reuse.setField(resultSet.getDouble(pos + 1), pos);
-						break;
-					case java.sql.Types.REAL:
-						reuse.setField(resultSet.getFloat(pos + 1), pos);
-						break;
-					case java.sql.Types.DOUBLE:
-						reuse.setField(resultSet.getDouble(pos + 1), pos);
-						break;
-					case java.sql.Types.DECIMAL:
-						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-						break;
-					case java.sql.Types.NUMERIC:
-						reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
-						break;
-					case java.sql.Types.DATE:
-						reuse.setField(resultSet.getDate(pos + 1).toString(), pos);
-						break;
-					case java.sql.Types.TIME:
-						reuse.setField(resultSet.getTime(pos + 1).getTime(), pos);
-						break;
-					case java.sql.Types.TIMESTAMP:
-						reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
-						break;
-					case java.sql.Types.SQLXML:
-						reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos);
-						break;
-					default:
-						throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
-
-						// case java.sql.Types.BINARY:
-						// case java.sql.Types.VARBINARY:
-						// case java.sql.Types.LONGVARBINARY:
-						// case java.sql.Types.ARRAY:
-						// case java.sql.Types.JAVA_OBJECT:
-						// case java.sql.Types.BLOB:
-						// case java.sql.Types.CLOB:
-						// case java.sql.Types.NCLOB:
-						// case java.sql.Types.DATALINK:
-						// case java.sql.Types.DISTINCT:
-						// case java.sql.Types.OTHER:
-						// case java.sql.Types.REF:
-						// case java.sql.Types.ROWID:
-						// case java.sql.Types.STRUCT:
-				}
-			} catch (NullPointerException npe) {
-				throw new IOException("Encountered null value for column " + pos + ". Decimal, Numeric, Date, Time, Timestamp and SQLXML columns may not contain NULL values.");
-			}
-		}
-	}
-
 	@Override
 	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
 		return cachedStatistics;
@@ -289,17 +296,20 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 
 	@Override
 	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		GenericInputSplit[] split = {
-			new GenericInputSplit(0, 1)
-		};
-		return split;
+		if (parameterValues == null) {
+			return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
+		}
+		GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
+		for (int i = 0; i < ret.length; i++) {
+			ret[i] = new GenericInputSplit(i, ret.length);
+		}
+		return ret;
 	}
 
 	@Override
 	public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
 		return new DefaultInputSplitAssigner(inputSplits);
 	}
-	
 
 	/**
 	 * A builder used to set parameters to the output format's configuration in a fluent way.
@@ -314,6 +324,7 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 
 		public JDBCInputFormatBuilder() {
 			this.format = new JDBCInputFormat();
+			//using TYPE_FORWARD_ONLY for high performance reads
 			this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
 			this.format.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
 		}
@@ -339,7 +350,7 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 		}
 
 		public JDBCInputFormatBuilder setQuery(String query) {
-			format.query = query;
+			format.queryTemplate = query;
 			return this;
 		}
 
@@ -353,6 +364,16 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 			return this;
 		}
 
+		public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
+			format.parameterValues = parameterValuesProvider.getParameterValues();
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setRowTypeInfo(RowTypeInfo rowTypeInfo) {
+			format.rowTypeInfo = rowTypeInfo;
+			return this;
+		}
+
 		public JDBCInputFormat finish() {
 			if (format.username == null) {
 				LOG.info("Username was not supplied separately.");
@@ -363,14 +384,21 @@ public class JDBCInputFormat<OUT extends Tuple> extends RichInputFormat<OUT, Inp
 			if (format.dbURL == null) {
 				throw new IllegalArgumentException("No database URL supplied");
 			}
-			if (format.query == null) {
+			if (format.queryTemplate == null) {
 				throw new IllegalArgumentException("No query supplied");
 			}
 			if (format.drivername == null) {
 				throw new IllegalArgumentException("No driver supplied");
 			}
+			if (format.rowTypeInfo == null) {
+				throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
+			}
+			if (format.parameterValues == null) {
+				LOG.debug("No input splitting configured (data will be read with parallelism 1).");
+			}
 			return format;
 		}
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 614c5b7..5464a94 100644
--- a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -24,47 +24,46 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.table.Row;
 import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * OutputFormat to write tuples into a database.
  * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
  * 
- * @param <OUT>
  * @see Tuple
  * @see DriverManager
  */
-public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
+public class JDBCOutputFormat extends RichOutputFormat<Row> {
 	private static final long serialVersionUID = 1L;
-
-	@SuppressWarnings("unused")
+	
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class);
-
+	
 	private String username;
 	private String password;
 	private String drivername;
 	private String dbURL;
 	private String query;
 	private int batchInterval = 5000;
-
+	
 	private Connection dbConn;
 	private PreparedStatement upload;
-
-	private SupportedTypes[] types = null;
-
+	
 	private int batchCount = 0;
-
+	
+	public int[] typesArray;
+	
 	public JDBCOutputFormat() {
 	}
-
+	
 	@Override
 	public void configure(Configuration parameters) {
 	}
-
+	
 	/**
 	 * Connects to the target database and initializes the prepared statement.
 	 *
@@ -78,14 +77,12 @@ public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
 			establishConnection();
 			upload = dbConn.prepareStatement(query);
 		} catch (SQLException sqe) {
-			close();
-			throw new IllegalArgumentException("open() failed:\t!", sqe);
+			throw new IllegalArgumentException("open() failed.", sqe);
 		} catch (ClassNotFoundException cnfe) {
-			close();
-			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
+			throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
 		}
 	}
-
+	
 	private void establishConnection() throws SQLException, ClassNotFoundException {
 		Class.forName(drivername);
 		if (username == null) {
@@ -94,86 +91,125 @@ public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
 			dbConn = DriverManager.getConnection(dbURL, username, password);
 		}
 	}
-
-	private enum SupportedTypes {
-		BOOLEAN,
-		BYTE,
-		SHORT,
-		INTEGER,
-		LONG,
-		STRING,
-		FLOAT,
-		DOUBLE
-	}
-
+	
 	/**
 	 * Adds a record to the prepared statement.
 	 * <p>
 	 * When this method is called, the output format is guaranteed to be opened.
+	 * </p>
+	 * 
+	 * WARNING: this may fail when no column types specified (because a best effort approach is attempted in order to
+	 * insert a null value but it's not guaranteed that the JDBC driver handles PreparedStatement.setObject(pos, null))
 	 *
-	 * @param tuple The records to add to the output.
+	 * @param row The records to add to the output.
+	 * @see PreparedStatement
 	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
 	 */
 	@Override
-	public void writeRecord(OUT tuple) throws IOException {
+	public void writeRecord(Row row) throws IOException {
+
+		if (typesArray != null && typesArray.length > 0 && typesArray.length == row.productArity()) {
+			LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+		} 
 		try {
-			if (types == null) {
-				extractTypes(tuple);
+
+			if (typesArray == null ) {
+				// no types provided
+				for (int index = 0; index < row.productArity(); index++) {
+					LOG.warn("Unknown column type for column %s. Best effort approach to set its value: %s.", index + 1, row.productElement(index));
+					upload.setObject(index + 1, row.productElement(index));
+				}
+			} else {
+				// types provided
+				for (int index = 0; index < row.productArity(); index++) {
+
+					if (row.productElement(index) == null) {
+						upload.setNull(index + 1, typesArray[index]);
+					} else {
+						// casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+						switch (typesArray[index]) {
+							case java.sql.Types.NULL:
+								upload.setNull(index + 1, typesArray[index]);
+								break;
+							case java.sql.Types.BOOLEAN:
+							case java.sql.Types.BIT:
+								upload.setBoolean(index + 1, (boolean) row.productElement(index));
+								break;
+							case java.sql.Types.CHAR:
+							case java.sql.Types.NCHAR:
+							case java.sql.Types.VARCHAR:
+							case java.sql.Types.LONGVARCHAR:
+							case java.sql.Types.LONGNVARCHAR:
+								upload.setString(index + 1, (String) row.productElement(index));
+								break;
+							case java.sql.Types.TINYINT:
+								upload.setByte(index + 1, (byte) row.productElement(index));
+								break;
+							case java.sql.Types.SMALLINT:
+								upload.setShort(index + 1, (short) row.productElement(index));
+								break;
+							case java.sql.Types.INTEGER:
+								upload.setInt(index + 1, (int) row.productElement(index));
+								break;
+							case java.sql.Types.BIGINT:
+								upload.setLong(index + 1, (long) row.productElement(index));
+								break;
+							case java.sql.Types.REAL:
+								upload.setFloat(index + 1, (float) row.productElement(index));
+								break;
+							case java.sql.Types.FLOAT:
+							case java.sql.Types.DOUBLE:
+								upload.setDouble(index + 1, (double) row.productElement(index));
+								break;
+							case java.sql.Types.DECIMAL:
+							case java.sql.Types.NUMERIC:
+								upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.productElement(index));
+								break;
+							case java.sql.Types.DATE:
+								upload.setDate(index + 1, (java.sql.Date) row.productElement(index));
+								break;
+							case java.sql.Types.TIME:
+								upload.setTime(index + 1, (java.sql.Time) row.productElement(index));
+								break;
+							case java.sql.Types.TIMESTAMP:
+								upload.setTimestamp(index + 1, (java.sql.Timestamp) row.productElement(index));
+								break;
+							case java.sql.Types.BINARY:
+							case java.sql.Types.VARBINARY:
+							case java.sql.Types.LONGVARBINARY:
+								upload.setBytes(index + 1, (byte[]) row.productElement(index));
+								break;
+							default:
+								upload.setObject(index + 1, row.productElement(index));
+								LOG.warn("Unmanaged sql type (%s) for column %s. Best effort approach to set its value: %s.",
+									typesArray[index], index + 1, row.productElement(index));
+								// case java.sql.Types.SQLXML
+								// case java.sql.Types.ARRAY:
+								// case java.sql.Types.JAVA_OBJECT:
+								// case java.sql.Types.BLOB:
+								// case java.sql.Types.CLOB:
+								// case java.sql.Types.NCLOB:
+								// case java.sql.Types.DATALINK:
+								// case java.sql.Types.DISTINCT:
+								// case java.sql.Types.OTHER:
+								// case java.sql.Types.REF:
+								// case java.sql.Types.ROWID:
+								// case java.sql.Types.STRUC
+						}
+					}
+				}
 			}
-			addValues(tuple);
 			upload.addBatch();
 			batchCount++;
 			if (batchCount >= batchInterval) {
 				upload.executeBatch();
 				batchCount = 0;
 			}
-		} catch (SQLException sqe) {
-			close();
-			throw new IllegalArgumentException("writeRecord() failed", sqe);
-		} catch (IllegalArgumentException iae) {
-			close();
-			throw new IllegalArgumentException("writeRecord() failed", iae);
-		}
-	}
-
-	private void extractTypes(OUT tuple) {
-		types = new SupportedTypes[tuple.getArity()];
-		for (int x = 0; x < tuple.getArity(); x++) {
-			types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
+		} catch (SQLException | IllegalArgumentException e) {
+			throw new IllegalArgumentException("writeRecord() failed", e);
 		}
 	}
-
-	private void addValues(OUT tuple) throws SQLException {
-		for (int index = 0; index < tuple.getArity(); index++) {
-			switch (types[index]) {
-				case BOOLEAN:
-					upload.setBoolean(index + 1, (Boolean) tuple.getField(index));
-					break;
-				case BYTE:
-					upload.setByte(index + 1, (Byte) tuple.getField(index));
-					break;
-				case SHORT:
-					upload.setShort(index + 1, (Short) tuple.getField(index));
-					break;
-				case INTEGER:
-					upload.setInt(index + 1, (Integer) tuple.getField(index));
-					break;
-				case LONG:
-					upload.setLong(index + 1, (Long) tuple.getField(index));
-					break;
-				case STRING:
-					upload.setString(index + 1, (String) tuple.getField(index));
-					break;
-				case FLOAT:
-					upload.setFloat(index + 1, (Float) tuple.getField(index));
-					break;
-				case DOUBLE:
-					upload.setDouble(index + 1, (Double) tuple.getField(index));
-					break;
-			}
-		}
-	}
-
+	
 	/**
 	 * Executes prepared statement and closes all resources of this instance.
 	 *
@@ -182,70 +218,78 @@ public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
 	@Override
 	public void close() throws IOException {
 		try {
-			upload.executeBatch();
-			batchCount = 0;
-		} catch (SQLException se) {
-			throw new IllegalArgumentException("close() failed", se);
-		} catch (NullPointerException se) {
-		}
-		try {
-			upload.close();
+			if (upload != null) {
+				upload.executeBatch();
+				upload.close();
+			}
 		} catch (SQLException se) {
 			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
+		} finally {
+			upload = null;
+			batchCount = 0;
 		}
+		
 		try {
-			dbConn.close();
+			if (dbConn != null) {
+				dbConn.close();
+			}
 		} catch (SQLException se) {
 			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
-		} catch (NullPointerException npe) {
+		} finally {
+			dbConn = null;
 		}
 	}
-
+	
 	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
 		return new JDBCOutputFormatBuilder();
 	}
-
+	
 	public static class JDBCOutputFormatBuilder {
 		private final JDBCOutputFormat format;
-
+		
 		protected JDBCOutputFormatBuilder() {
 			this.format = new JDBCOutputFormat();
 		}
-
+		
 		public JDBCOutputFormatBuilder setUsername(String username) {
 			format.username = username;
 			return this;
 		}
-
+		
 		public JDBCOutputFormatBuilder setPassword(String password) {
 			format.password = password;
 			return this;
 		}
-
+		
 		public JDBCOutputFormatBuilder setDrivername(String drivername) {
 			format.drivername = drivername;
 			return this;
 		}
-
+		
 		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
 			format.dbURL = dbURL;
 			return this;
 		}
-
+		
 		public JDBCOutputFormatBuilder setQuery(String query) {
 			format.query = query;
 			return this;
 		}
-
+		
 		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
 			format.batchInterval = batchInterval;
 			return this;
 		}
-
+		
+		public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) {
+			format.typesArray = typesArray;
+			return this;
+		}
+		
 		/**
-		Finalizes the configuration and checks validity.
-		@return Configured JDBCOutputFormat
+		 * Finalizes the configuration and checks validity.
+		 * 
+		 * @return Configured JDBCOutputFormat
 		 */
 		public JDBCOutputFormat finish() {
 			if (format.username == null) {
@@ -263,8 +307,9 @@ public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> {
 			if (format.drivername == null) {
 				throw new IllegalArgumentException("No driver supplied");
 			}
+			
 			return format;
 		}
 	}
-
+	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
new file mode 100644
index 0000000..6c70a8c
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/** 
+ * 
+ * This splits generator actually does nothing but wrapping the query parameters
+ * computed by the user before creating the {@link JDBCInputFormat} instance.
+ * 
+ * */
+public class GenericParameterValuesProvider implements ParameterValuesProvider {
+
+	private final Serializable[][] parameters;
+	
+	public GenericParameterValuesProvider(Serializable[][] parameters) {
+		this.parameters = parameters;
+	}
+
+	@Override
+	public Serializable[][] getParameterValues(){
+		//do nothing...precomputed externally
+		return parameters;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
new file mode 100644
index 0000000..306663e
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+/** 
+ * 
+ * This query generator assumes that the query to parameterize contains a BETWEEN constraint on a numeric column.
+ * The generated query set will be of size equal to the configured fetchSize (apart the last one range),
+ * ranging from the min value up to the max.
+ * 
+ * For example, if there's a table <CODE>BOOKS</CODE> with a numeric PK <CODE>id</CODE>, using a query like:
+ * <PRE>
+ *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ?
+ * </PRE>
+ *
+ * you can use this class to automatically generate the parameters of the BETWEEN clause,
+ * based on the passed constructor parameters.
+ * 
+ * */
+public class NumericBetweenParametersProvider implements ParameterValuesProvider {
+
+	private long fetchSize;
+	private final long min;
+	private final long max;
+	
+	public NumericBetweenParametersProvider(long fetchSize, long min, long max) {
+		this.fetchSize = fetchSize;
+		this.min = min;
+		this.max = max;
+	}
+
+	@Override
+	public Serializable[][] getParameterValues(){
+		double maxElemCount = (max - min) + 1;
+		int size = new Double(Math.ceil(maxElemCount / fetchSize)).intValue();
+		Serializable[][] parameters = new Serializable[size][2];
+		int count = 0;
+		for (long i = min; i < max; i += fetchSize, count++) {
+			long currentLimit = i + fetchSize - 1;
+			parameters[count] = new Long[]{i,currentLimit};
+			if (currentLimit + 1 + fetchSize > max) {
+				parameters[count + 1] = new Long[]{currentLimit + 1, max};
+				break;
+			}
+		}
+		return parameters;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
new file mode 100644
index 0000000..6c632f8
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc.split;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+
+/**
+ * 
+ * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} implementation
+ * 
+ * */
+public interface ParameterValuesProvider {
+
+	/** Returns the necessary parameters array to use for query in parallel a table */
+	public Serializable[][] getParameterValues();
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java
deleted file mode 100644
index 523b8b5..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/DerbyUtil.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io.jdbc;
-
-import java.io.OutputStream;
-
-@SuppressWarnings("unused")
-/**
- * Utility class to disable derby logging
- */
-public class DerbyUtil {
-	public static final OutputStream DEV_NULL = new OutputStream() {
-		public void write(int b) {
-		}
-	};
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
new file mode 100644
index 0000000..da9469b
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCFullTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.table.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JDBCFullTest extends JDBCTestBase {
+
+	@Test
+	public void testJdbcInOut() throws Exception {
+		//run without parallelism
+		runTest(false);
+
+		//cleanup
+		JDBCTestBase.tearDownClass();
+		JDBCTestBase.prepareTestDb();
+		
+		//run expliting parallelism
+		runTest(true);
+		
+	}
+
+	private void runTest(boolean exploitParallelism) {
+		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+		JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(JDBCTestBase.DRIVER_CLASS)
+				.setDBUrl(JDBCTestBase.DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo);
+
+		if(exploitParallelism) {
+			final int fetchSize = 1;
+			final Long min = new Long(JDBCTestBase.testData[0][0].toString());
+			final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0].toString());
+			//use a "splittable" query to exploit parallelism
+			inputBuilder = inputBuilder
+					.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+					.setParametersProvider(new NumericBetweenParametersProvider(fetchSize, min, max));
+		}
+		DataSet<Row> source = environment.createInput(inputBuilder.finish());
+
+		//NOTE: in this case (with Derby driver) setSqlTypes could be skipped, but
+		//some database, doens't handle correctly null values when no column type specified
+		//in PreparedStatement.setObject (see its javadoc for more details)
+		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername(JDBCTestBase.DRIVER_CLASS)
+				.setDBUrl(JDBCTestBase.DB_URL)
+				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
+				.setSqlTypes(new int[]{Types.INTEGER, Types.VARCHAR, Types.VARCHAR,Types.DOUBLE,Types.INTEGER})
+				.finish());
+		try {
+			environment.execute();
+		} catch (Exception e) {
+			Assert.fail("JDBC full test failed. " + e.getMessage());
+		}
+
+		try (
+			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+			ResultSet resultSet = statement.executeQuery()
+		) {
+			int count = 0;
+			while (resultSet.next()) {
+				count++;
+			}
+			Assert.assertEquals(JDBCTestBase.testData.length, count);
+		} catch (SQLException e) {
+			Assert.fail("JDBC full test failed. " + e.getMessage());
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
index 3fb0278..efae076 100644
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
+++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -19,180 +19,229 @@
 package org.apache.flink.api.java.io.jdbc;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.io.Serializable;
 import java.sql.ResultSet;
 
-
-import org.junit.Assert;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.io.jdbc.split.GenericParameterValuesProvider;
+import org.apache.flink.api.java.io.jdbc.split.NumericBetweenParametersProvider;
+import org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.core.io.InputSplit;
 import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assert;
 import org.junit.Test;
 
-public class JDBCInputFormatTest {
-	JDBCInputFormat jdbcInputFormat;
+public class JDBCInputFormatTest extends JDBCTestBase {
 
-	static Connection conn;
-
-	static final Object[][] dbData = {
-		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
-		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-	@BeforeClass
-	public static void setUpClass() {
-		try {
-			prepareDerbyDatabase();
-		} catch (Exception e) {
-			Assert.fail();
-		}
-	}
+	private JDBCInputFormat jdbcInputFormat;
 
-	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
-		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		conn = DriverManager.getConnection(dbURL);
-		createTable();
-		insertDataToSQLTable();
-		conn.close();
-	}
-
-	private static void createTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTable() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void tearDownClass() {
-		cleanUpDerbyDatabases();
-	}
-
-	private static void cleanUpDerbyDatabases() {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-			conn = DriverManager.getConnection(dbURL);
-			Statement stat = conn.createStatement();
-			stat.executeUpdate("DROP TABLE books");
-			stat.close();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
+	@After
+	public void tearDown() throws IOException {
+		if (jdbcInputFormat != null) {
+			jdbcInputFormat.close();
 		}
+		jdbcInputFormat = null;
 	}
 
-	@After
-	public void tearDown() {
-		jdbcInputFormat = null;
+	@Test(expected = IllegalArgumentException.class)
+	public void testUntypedRowInfo() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.finish();
+		jdbcInputFormat.openInputFormat();
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidDriver() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
 				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
 				.finish();
-		jdbcInputFormat.open(null);
+		jdbcInputFormat.openInputFormat();
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidURL() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDrivername(DRIVER_CLASS)
 				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery("select * from books")
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
 				.finish();
-		jdbcInputFormat.open(null);
+		jdbcInputFormat.openInputFormat();
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidQuery() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
 				.setQuery("iamnotsql")
+				.setRowTypeInfo(rowTypeInfo)
 				.finish();
-		jdbcInputFormat.open(null);
+		jdbcInputFormat.openInputFormat();
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testIncompleteConfiguration() throws IOException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setQuery("select * from books")
-				.finish();
-	}
-
-	@Test(expected = IOException.class)
-	public void testIncompatibleTuple() throws IOException {
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
+				.setDrivername(DRIVER_CLASS)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
 				.finish();
-		jdbcInputFormat.open(null);
-		jdbcInputFormat.nextRecord(new Tuple2());
 	}
 
 	@Test
-	public void testJDBCInputFormat() throws IOException {
+	public void testJDBCInputFormatWithoutParallelism() throws IOException, InstantiationException, IllegalAccessException {
 		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("select * from books")
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_ALL_BOOKS)
+				.setRowTypeInfo(rowTypeInfo)
 				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
 				.finish();
+		//this query does not exploit parallelism
+		Assert.assertEquals(1, jdbcInputFormat.createInputSplits(1).length);
+		jdbcInputFormat.openInputFormat();
 		jdbcInputFormat.open(null);
-		Tuple5 tuple = new Tuple5();
+		Row row =  new Row(5);
 		int recordCount = 0;
 		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
-			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
-			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
-			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
-			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
+			Row next = jdbcInputFormat.nextRecord(row);
+			if (next == null) {
+				break;
+			}
+			
+			if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+			if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+			if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+			if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+			if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
 
 			for (int x = 0; x < 5; x++) {
-				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
+				if(testData[recordCount][x]!=null) {
+					Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+				}
 			}
 			recordCount++;
 		}
-		Assert.assertEquals(5, recordCount);
+		jdbcInputFormat.close();
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(testData.length, recordCount);
+	}
+	
+	@Test
+	public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException, InstantiationException, IllegalAccessException {
+		final int fetchSize = 1;
+		final Long min = new Long(JDBCTestBase.testData[0][0] + "");
+		final Long max = new Long(JDBCTestBase.testData[JDBCTestBase.testData.length - fetchSize][0] + "");
+		ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
+				.setRowTypeInfo(rowTypeInfo)
+				.setParametersProvider(pramProvider)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+
+		jdbcInputFormat.openInputFormat();
+		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+		//this query exploit parallelism (1 split for every id)
+		Assert.assertEquals(testData.length, splits.length);
+		int recordCount = 0;
+		Row row =  new Row(5);
+		for (int i = 0; i < splits.length; i++) {
+			jdbcInputFormat.open(splits[i]);
+			while (!jdbcInputFormat.reachedEnd()) {
+				Row next = jdbcInputFormat.nextRecord(row);
+				if (next == null) {
+					break;
+				}
+				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+				for (int x = 0; x < 5; x++) {
+					if(testData[recordCount][x]!=null) {
+						Assert.assertEquals(testData[recordCount][x], next.productElement(x));
+					}
+				}
+				recordCount++;
+			}
+			jdbcInputFormat.close();
+		}
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(testData.length, recordCount);
+	}
+	
+	@Test
+	public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException, InstantiationException, IllegalAccessException {
+		Serializable[][] queryParameters = new String[2][1];
+		queryParameters[0] = new String[]{"Kumar"};
+		queryParameters[1] = new String[]{"Tan Ah Teck"};
+		ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
+				.setRowTypeInfo(rowTypeInfo)
+				.setParametersProvider(paramProvider)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+		InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
+		//this query exploit parallelism (1 split for every queryParameters row)
+		Assert.assertEquals(queryParameters.length, splits.length);
+		int recordCount = 0;
+		Row row =  new Row(5);
+		for (int i = 0; i < splits.length; i++) {
+			jdbcInputFormat.open(splits[i]);
+			while (!jdbcInputFormat.reachedEnd()) {
+				Row next = jdbcInputFormat.nextRecord(row);
+				if (next == null) {
+					break;
+				}
+				if(next.productElement(0)!=null) { Assert.assertEquals("Field 0 should be int", Integer.class, next.productElement(0).getClass());}
+				if(next.productElement(1)!=null) { Assert.assertEquals("Field 1 should be String", String.class, next.productElement(1).getClass());}
+				if(next.productElement(2)!=null) { Assert.assertEquals("Field 2 should be String", String.class, next.productElement(2).getClass());}
+				if(next.productElement(3)!=null) { Assert.assertEquals("Field 3 should be float", Double.class, next.productElement(3).getClass());}
+				if(next.productElement(4)!=null) { Assert.assertEquals("Field 4 should be int", Integer.class, next.productElement(4).getClass());}
+
+				recordCount++;
+			}
+			jdbcInputFormat.close();
+		}
+		Assert.assertEquals(3, recordCount);
+		jdbcInputFormat.closeInputFormat();
+	}
+	
+	@Test
+	public void testEmptyResults() throws IOException, InstantiationException, IllegalAccessException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(SELECT_EMPTY)
+				.setRowTypeInfo(rowTypeInfo)
+				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
+				.finish();
+		jdbcInputFormat.openInputFormat();
+		jdbcInputFormat.open(null);
+		Row row = new Row(5);
+		int recordsCnt = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			Assert.assertNull(jdbcInputFormat.nextRecord(row));
+			recordsCnt++;
+		}
+		jdbcInputFormat.close();
+		jdbcInputFormat.closeInputFormat();
+		Assert.assertEquals(0, recordsCnt);
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index 1031b9a..086a84c 100644
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -21,105 +21,26 @@ package org.apache.flink.api.java.io.jdbc;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-
-import org.junit.Assert;
+import java.sql.SQLException;
 
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.table.Row;
 import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Assert;
 import org.junit.Test;
 
-public class JDBCOutputFormatTest {
-	private JDBCInputFormat jdbcInputFormat;
-	private JDBCOutputFormat jdbcOutputFormat;
-
-	private static Connection conn;
-
-	static final Object[][] dbData = {
-		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
-		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
-		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
-		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
-		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
-
-	@BeforeClass
-	public static void setUpClass() throws SQLException {
-		try {
-			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-			prepareDerbyDatabase();
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
-
-	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		conn = DriverManager.getConnection(dbURL);
-		createTable("books");
-		createTable("newbooks");
-		insertDataToSQLTables();
-		conn.close();
-	}
-
-	private static void createTable(String tableName) throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
-		sqlQueryBuilder.append(tableName);
-		sqlQueryBuilder.append(" (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	private static void insertDataToSQLTables() throws SQLException {
-		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		Statement stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-	}
-
-	@AfterClass
-	public static void tearDownClass() {
-		cleanUpDerbyDatabases();
-	}
+public class JDBCOutputFormatTest extends JDBCTestBase {
 
-	private static void cleanUpDerbyDatabases() {
-		try {
-			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-
-			conn = DriverManager.getConnection(dbURL);
-			Statement stat = conn.createStatement();
-			stat.executeUpdate("DROP TABLE books");
-			stat.executeUpdate("DROP TABLE newbooks");
-			stat.close();
-			conn.close();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail();
-		}
-	}
+	private JDBCOutputFormat jdbcOutputFormat;
+	private Tuple5<Integer, String, String, Double, String> tuple5 = new Tuple5<>();
 
 	@After
-	public void tearDown() {
+	public void tearDown() throws IOException {
+		if (jdbcOutputFormat != null) {
+			jdbcOutputFormat.close();
+		}
 		jdbcOutputFormat = null;
 	}
 
@@ -127,8 +48,8 @@ public class JDBCOutputFormatTest {
 	public void testInvalidDriver() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
 				.setDrivername("org.apache.derby.jdbc.idontexist")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
 	}
@@ -136,9 +57,9 @@ public class JDBCOutputFormatTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidURL() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDrivername(DRIVER_CLASS)
 				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
 	}
@@ -146,8 +67,8 @@ public class JDBCOutputFormatTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void testInvalidQuery() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
 				.setQuery("iamnotsql")
 				.finish();
 		jdbcOutputFormat.open(0, 1);
@@ -156,8 +77,8 @@ public class JDBCOutputFormatTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void testIncompleteConfiguration() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setDrivername(DRIVER_CLASS)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 	}
 
@@ -165,79 +86,84 @@ public class JDBCOutputFormatTest {
 	@Test(expected = IllegalArgumentException.class)
 	public void testIncompatibleTypes() throws IOException {
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, INPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
 
-		Tuple5 tuple5 = new Tuple5();
 		tuple5.setField(4, 0);
 		tuple5.setField("hello", 1);
 		tuple5.setField("world", 2);
 		tuple5.setField(0.99, 3);
 		tuple5.setField("imthewrongtype", 4);
 
-		jdbcOutputFormat.writeRecord(tuple5);
+		Row row = new Row(tuple5.getArity());
+		for (int i = 0; i < tuple5.getArity(); i++) {
+			row.setField(i, tuple5.getField(i));
+		}
+		jdbcOutputFormat.writeRecord(row);
 		jdbcOutputFormat.close();
 	}
 
 	@Test
-	public void testJDBCOutputFormat() throws IOException {
-		String sourceTable = "books";
-		String targetTable = "newbooks";
-		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
-		String dbUrl = "jdbc:derby:memory:ebookshop";
+	public void testJDBCOutputFormat() throws IOException, InstantiationException, IllegalAccessException {
 
 		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDBUrl(dbUrl)
-				.setDrivername(driverPath)
-				.setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)")
+				.setDrivername(DRIVER_CLASS)
+				.setDBUrl(DB_URL)
+				.setQuery(String.format(INSERT_TEMPLATE, OUTPUT_TABLE))
 				.finish();
 		jdbcOutputFormat.open(0, 1);
 
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(driverPath)
-				.setDBUrl(dbUrl)
-				.setQuery("select * from " + sourceTable)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-		jdbcInputFormat.open(null);
-
-		Tuple5 tuple = new Tuple5();
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			jdbcOutputFormat.writeRecord(tuple);
+		for (int i = 0; i < testData.length; i++) {
+			Row row = new Row(testData[i].length);
+			for (int j = 0; j < testData[i].length; j++) {
+				row.setField(j, testData[i][j]);
+			}
+			jdbcOutputFormat.writeRecord(row);
 		}
 
 		jdbcOutputFormat.close();
-		jdbcInputFormat.close();
 
-		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
-				.setDrivername(driverPath)
-				.setDBUrl(dbUrl)
-				.setQuery("select * from " + targetTable)
-				.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
-				.finish();
-		jdbcInputFormat.open(null);
-
-		int recordCount = 0;
-		while (!jdbcInputFormat.reachedEnd()) {
-			jdbcInputFormat.nextRecord(tuple);
-			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
-			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
-			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
-			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
-			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
-
-			for (int x = 0; x < 5; x++) {
-				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
+		try (
+			Connection dbConn = DriverManager.getConnection(JDBCTestBase.DB_URL);
+			PreparedStatement statement = dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS);
+			ResultSet resultSet = statement.executeQuery()
+		) {
+			int recordCount = 0;
+			while (resultSet.next()) {
+				Row row = new Row(tuple5.getArity());
+				for (int i = 0; i < tuple5.getArity(); i++) {
+					row.setField(i, resultSet.getObject(i + 1));
+				}
+				if (row.productElement(0) != null) {
+					Assert.assertEquals("Field 0 should be int", Integer.class, row.productElement(0).getClass());
+				}
+				if (row.productElement(1) != null) {
+					Assert.assertEquals("Field 1 should be String", String.class, row.productElement(1).getClass());
+				}
+				if (row.productElement(2) != null) {
+					Assert.assertEquals("Field 2 should be String", String.class, row.productElement(2).getClass());
+				}
+				if (row.productElement(3) != null) {
+					Assert.assertEquals("Field 3 should be float", Double.class, row.productElement(3).getClass());
+				}
+				if (row.productElement(4) != null) {
+					Assert.assertEquals("Field 4 should be int", Integer.class, row.productElement(4).getClass());
+				}
+
+				for (int x = 0; x < tuple5.getArity(); x++) {
+					if (JDBCTestBase.testData[recordCount][x] != null) {
+						Assert.assertEquals(JDBCTestBase.testData[recordCount][x], row.productElement(x));
+					}
+				}
+
+				recordCount++;
 			}
-
-			recordCount++;
+			Assert.assertEquals(JDBCTestBase.testData.length, recordCount);
+		} catch (SQLException e) {
+			Assert.fail("JDBC OutputFormat test failed. " + e.getMessage());
 		}
-		Assert.assertEquals(5, recordCount);
-
-		jdbcInputFormat.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
new file mode 100644
index 0000000..1c44afe
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+/**
+ * Base test class for JDBC Input and Output formats
+ */
+public class JDBCTestBase {
+	
+	public static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
+	public static final String DB_URL = "jdbc:derby:memory:ebookshop";
+	public static final String INPUT_TABLE = "books";
+	public static final String OUTPUT_TABLE = "newbooks";
+	public static final String SELECT_ALL_BOOKS = "select * from " + INPUT_TABLE;
+	public static final String SELECT_ALL_NEWBOOKS = "select * from " + OUTPUT_TABLE;
+	public static final String SELECT_EMPTY = "select * from books WHERE QTY < 0";
+	public static final String INSERT_TEMPLATE = "insert into %s (id, title, author, price, qty) values (?,?,?,?,?)";
+	public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
+	public static final String SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR = JDBCTestBase.SELECT_ALL_BOOKS + " WHERE author = ?";
+	
+	protected static Connection conn;
+
+	public static final Object[][] testData = {
+			{1001, ("Java public for dummies"), ("Tan Ah Teck"), 11.11, 11},
+			{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+			{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
+			{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+			{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55},
+			{1006, ("A Teaspoon of Java 1.4"), ("Kevin Jones"), 66.66, 66},
+			{1007, ("A Teaspoon of Java 1.5"), ("Kevin Jones"), 77.77, 77},
+			{1008, ("A Teaspoon of Java 1.6"), ("Kevin Jones"), 88.88, 88},
+			{1009, ("A Teaspoon of Java 1.7"), ("Kevin Jones"), 99.99, 99},
+			{1010, ("A Teaspoon of Java 1.8"), ("Kevin Jones"), null, 1010}};
+
+	public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[] {
+		BasicTypeInfo.INT_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.STRING_TYPE_INFO,
+		BasicTypeInfo.DOUBLE_TYPE_INFO,
+		BasicTypeInfo.INT_TYPE_INFO
+	};
+	
+	public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
+
+	public static String getCreateQuery(String tableName) {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
+		sqlQueryBuilder.append(tableName).append(" (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+		return sqlQueryBuilder.toString();
+	}
+	
+	public static String getInsertQuery() {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		for (int i = 0; i < JDBCTestBase.testData.length; i++) {
+			sqlQueryBuilder.append("(")
+			.append(JDBCTestBase.testData[i][0]).append(",'")
+			.append(JDBCTestBase.testData[i][1]).append("','")
+			.append(JDBCTestBase.testData[i][2]).append("',")
+			.append(JDBCTestBase.testData[i][3]).append(",")
+			.append(JDBCTestBase.testData[i][4]).append(")");
+			if (i < JDBCTestBase.testData.length - 1) {
+				sqlQueryBuilder.append(",");
+			}
+		}
+		String insertQuery = sqlQueryBuilder.toString();
+		return insertQuery;
+	}
+	
+	public static final OutputStream DEV_NULL = new OutputStream() {
+		@Override
+		public void write(int b) {
+		}
+	};
+
+	public static void prepareTestDb() throws Exception {
+		System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+		Class.forName(DRIVER_CLASS);
+		Connection conn = DriverManager.getConnection(DB_URL + ";create=true");
+
+		//create input table
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(INPUT_TABLE));
+		stat.close();
+
+		//create output table
+		stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(OUTPUT_TABLE));
+		stat.close();
+
+		//prepare input data
+		stat = conn.createStatement();
+		stat.execute(JDBCTestBase.getInsertQuery());
+		stat.close();
+
+		conn.close();
+	}
+
+	@BeforeClass
+	public static void setUpClass() throws SQLException {
+		try {
+			System.setProperty("derby.stream.error.field", JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+			prepareDerbyDatabase();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
+		Class.forName(DRIVER_CLASS);
+		conn = DriverManager.getConnection(DB_URL + ";create=true");
+		createTable(INPUT_TABLE);
+		createTable(OUTPUT_TABLE);
+		insertDataIntoInputTable();
+		conn.close();
+	}
+	
+	private static void createTable(String tableName) throws SQLException {
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(getCreateQuery(tableName));
+		stat.close();
+	}
+	
+	private static void insertDataIntoInputTable() throws SQLException {
+		Statement stat = conn.createStatement();
+		stat.execute(JDBCTestBase.getInsertQuery());
+		stat.close();
+	}
+
+	@AfterClass
+	public static void tearDownClass() {
+		cleanUpDerbyDatabases();
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		try {
+			Class.forName(DRIVER_CLASS);
+			conn = DriverManager.getConnection(DB_URL + ";create=true");
+			Statement stat = conn.createStatement();
+			stat.executeUpdate("DROP TABLE "+INPUT_TABLE);
+			stat.executeUpdate("DROP TABLE "+OUTPUT_TABLE);
+			stat.close();
+			conn.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09b428bd/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
deleted file mode 100644
index 840a314..0000000
--- a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io.jdbc.example;
-
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.Statement;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
-import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class JDBCExample {
-
-	public static void main(String[] args) throws Exception {
-		prepareTestDb();
-
-		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Tuple5> source
-				= environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
-						.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-						.setDBUrl("jdbc:derby:memory:ebookshop")
-						.setQuery("select * from books")
-						.finish(),
-						new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO)
-				);
-
-		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
-				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
-				.setDBUrl("jdbc:derby:memory:ebookshop")
-				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
-				.finish());
-		environment.execute();
-	}
-
-	private static void prepareTestDb() throws Exception {
-		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.io.jdbc.DerbyUtil.DEV_NULL");
-		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
-		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
-		Connection conn = DriverManager.getConnection(dbURL);
-
-		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		Statement stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
-		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
-		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
-		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
-		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
-		sqlQueryBuilder.append("PRIMARY KEY (id))");
-
-		stat = conn.createStatement();
-		stat.executeUpdate(sqlQueryBuilder.toString());
-		stat.close();
-
-		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
-		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
-		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
-		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
-		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
-		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
-
-		stat = conn.createStatement();
-		stat.execute(sqlQueryBuilder.toString());
-		stat.close();
-
-		conn.close();
-	}
-}


Mime
View raw message