flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [67/92] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Tue, 22 Jul 2014 10:41:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/pom.xml b/flink-addons/flink-jdbc/pom.xml
new file mode 100644
index 0000000..409da03
--- /dev/null
+++ b/flink-addons/flink-jdbc/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+	
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+	
+	<modelVersion>4.0.0</modelVersion>
+	
+	<parent>
+		<artifactId>flink-addons</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>0.6-incubating-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-jdbc</artifactId>
+	<name>flink-jdbc</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+				
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.10.1.1</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..ac8bc07
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.types.NullValue;
+
+/**
+ * InputFormat to read data from a database and generate tuples.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.
+ * 
+ * @param <OUT>
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCInputFormat<OUT extends Tuple> implements InputFormat<OUT, InputSplit> {
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("unused")
+	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+
+	private String username;
+	private String password;
+	private String drivername;
+	private String dbURL;
+	private String query;
+
+	private transient Connection dbConn;
+	private transient Statement statement;
+	private transient ResultSet resultSet;
+
+	private int[] columnTypes = null;
+
+	public JDBCInputFormat() {
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	/**
+	 * Connects to the source database and executes the query.
+	 *
+	 * @param ignored
+	 * @throws IOException
+	 */
+	@Override
+	public void open(InputSplit ignored) throws IOException {
+		try {
+			establishConnection();
+			statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+			resultSet = statement.executeQuery(query);
+		} catch (SQLException se) {
+			close();
+			throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe);
+		}
+	}
+
+	private void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(drivername);
+		if (username == null) {
+			dbConn = DriverManager.getConnection(dbURL);
+		} else {
+			dbConn = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+
+	/**
+	 * Closes all resources used.
+	 *
+	 * @throws IOException Indicates that a resource could not be closed.
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			resultSet.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} catch (NullPointerException npe) {
+		}
+		try {
+			statement.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} catch (NullPointerException npe) {
+		}
+		try {
+			dbConn.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} catch (NullPointerException npe) {
+		}
+	}
+
+	/**
+	 * Checks whether all data has been read.
+	 *
+	 * @return boolean value indication whether all data has been read.
+	 * @throws IOException
+	 */
+	@Override
+	public boolean reachedEnd() throws IOException {
+		try {
+			if (resultSet.isLast()) {
+				close();
+				return true;
+			}
+			return false;
+		} catch (SQLException se) {
+			throw new IOException("Couldn't evaluate reachedEnd() - " + se.getMessage(), se);
+		}
+	}
+
+	/**
+	 * Stores the next resultSet row in a tuple
+	 *
+	 * @param tuple
+	 * @return tuple containing next row
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public OUT nextRecord(OUT tuple) throws IOException {
+		try {
+			resultSet.next();
+			if (columnTypes == null) {
+				extractTypes(tuple);
+			}
+			addValue(tuple);
+			return tuple;
+		} catch (SQLException se) {
+			close();
+			throw new IOException("Couldn't read data - " + se.getMessage(), se);
+		} catch (NullPointerException npe) {
+			close();
+			throw new IOException("Couldn't access resultSet", npe);
+		}
+	}
+
+	private void extractTypes(OUT tuple) throws SQLException, IOException {
+		ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+		columnTypes = new int[resultSetMetaData.getColumnCount()];
+		if (tuple.getArity() != columnTypes.length) {
+			close();
+			throw new IOException("Tuple size does not match columncount");
+		}
+		for (int pos = 0; pos < columnTypes.length; pos++) {
+			columnTypes[pos] = resultSetMetaData.getColumnType(pos + 1);
+		}
+	}
+
+	/**
+	 * Enters data value from the current resultSet into a Record.
+	 *
+	 * @param pos Tuple position to be set.
+	 * @param type SQL type of the resultSet value.
+	 * @param reuse Target Record.
+	 */
+	private void addValue(OUT reuse) throws SQLException {
+		for (int pos = 0; pos < columnTypes.length; pos++) {
+			switch (columnTypes[pos]) {
+				case java.sql.Types.NULL:
+					reuse.setField(NullValue.getInstance(), pos);
+					break;
+				case java.sql.Types.BOOLEAN:
+					reuse.setField(resultSet.getBoolean(pos + 1), pos);
+					break;
+				case java.sql.Types.BIT:
+					reuse.setField(resultSet.getBoolean(pos + 1), pos);
+					break;
+				case java.sql.Types.CHAR:
+					reuse.setField(resultSet.getString(pos + 1), pos);
+					break;
+				case java.sql.Types.NCHAR:
+					reuse.setField(resultSet.getString(pos + 1), pos);
+					break;
+				case java.sql.Types.VARCHAR:
+					reuse.setField(resultSet.getString(pos + 1), pos);
+					break;
+				case java.sql.Types.LONGVARCHAR:
+					reuse.setField(resultSet.getString(pos + 1), pos);
+					break;
+				case java.sql.Types.LONGNVARCHAR:
+					reuse.setField(resultSet.getString(pos + 1), pos);
+					break;
+				case java.sql.Types.TINYINT:
+					reuse.setField(resultSet.getShort(pos + 1), pos);
+					break;
+				case java.sql.Types.SMALLINT:
+					reuse.setField(resultSet.getShort(pos + 1), pos);
+					break;
+				case java.sql.Types.BIGINT:
+					reuse.setField(resultSet.getLong(pos + 1), pos);
+					break;
+				case java.sql.Types.INTEGER:
+					reuse.setField(resultSet.getInt(pos + 1), pos);
+					break;
+				case java.sql.Types.FLOAT:
+					reuse.setField(resultSet.getDouble(pos + 1), pos);
+					break;
+				case java.sql.Types.REAL:
+					reuse.setField(resultSet.getFloat(pos + 1), pos);
+					break;
+				case java.sql.Types.DOUBLE:
+					reuse.setField(resultSet.getDouble(pos + 1), pos);
+					break;
+				case java.sql.Types.DECIMAL:
+					reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+					break;
+				case java.sql.Types.NUMERIC:
+					reuse.setField(resultSet.getBigDecimal(pos + 1).doubleValue(), pos);
+					break;
+				case java.sql.Types.DATE:
+					reuse.setField(resultSet.getDate(pos + 1).toString(), pos);
+					break;
+				case java.sql.Types.TIME:
+					reuse.setField(resultSet.getTime(pos + 1).getTime(), pos);
+					break;
+				case java.sql.Types.TIMESTAMP:
+					reuse.setField(resultSet.getTimestamp(pos + 1).toString(), pos);
+					break;
+				case java.sql.Types.SQLXML:
+					reuse.setField(resultSet.getSQLXML(pos + 1).toString(), pos);
+					break;
+				default:
+					throw new SQLException("Unsupported sql-type [" + columnTypes[pos] + "] on column [" + pos + "]");
+
+				// case java.sql.Types.BINARY:
+				// case java.sql.Types.VARBINARY:
+				// case java.sql.Types.LONGVARBINARY:
+				// case java.sql.Types.ARRAY:
+				// case java.sql.Types.JAVA_OBJECT:
+				// case java.sql.Types.BLOB:
+				// case java.sql.Types.CLOB:
+				// case java.sql.Types.NCLOB:
+				// case java.sql.Types.DATALINK:
+				// case java.sql.Types.DISTINCT:
+				// case java.sql.Types.OTHER:
+				// case java.sql.Types.REF:
+				// case java.sql.Types.ROWID:
+				// case java.sql.Types.STRUCT:
+			}
+		}
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return cachedStatistics;
+	}
+
+	@Override
+	public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		GenericInputSplit[] split = {
+			new GenericInputSplit(0, 1)
+		};
+		return split;
+	}
+
+	@Override
+	public Class<? extends InputSplit> getInputSplitType() {
+		return GenericInputSplit.class;
+	}
+
+	/**
+	 * A builder used to set parameters to the output format's configuration in a fluent way.
+	 * @return builder
+	 */
+	public static JDBCInputFormatBuilder buildJDBCInputFormat() {
+		return new JDBCInputFormatBuilder();
+	}
+
+	public static class JDBCInputFormatBuilder {
+		private final JDBCInputFormat format;
+
+		public JDBCInputFormatBuilder() {
+			this.format = new JDBCInputFormat();
+		}
+
+		public JDBCInputFormatBuilder setUsername(String username) {
+			format.username = username;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setPassword(String password) {
+			format.password = password;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setDrivername(String drivername) {
+			format.drivername = drivername;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setDBUrl(String dbURL) {
+			format.dbURL = dbURL;
+			return this;
+		}
+
+		public JDBCInputFormatBuilder setQuery(String query) {
+			format.query = query;
+			return this;
+		}
+
+		public JDBCInputFormat finish() {
+			if (format.username == null) {
+				LOG.info("Username was not supplied separately.");
+			}
+			if (format.password == null) {
+				LOG.info("Password was not supplied separately.");
+			}
+			if (format.dbURL == null) {
+				throw new IllegalArgumentException("No dababase URL supplied.");
+			}
+			if (format.query == null) {
+				throw new IllegalArgumentException("No query suplied");
+			}
+			if (format.drivername == null) {
+				throw new IllegalArgumentException("No driver supplied");
+			}
+			return format;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..3a75480
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied OutputFormatBuilder.
+ * 
+ * @param <OUT>
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat<OUT extends Tuple> implements OutputFormat<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("unused")
+	private static final Log LOG = LogFactory.getLog(JDBCOutputFormat.class);
+
+	private String username;
+	private String password;
+	private String drivername;
+	private String dbURL;
+	private String query;
+	private int batchInterval = 5000;
+
+	private Connection dbConn;
+	private PreparedStatement upload;
+
+	private SupportedTypes[] types = null;
+
+	private int batchCount = 0;
+
+	public JDBCOutputFormat() {
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	/**
+	 * Connects to the target database and initializes the prepared statement.
+	 *
+	 * @param taskNumber The number of the parallel instance.
+	 * @throws IOException Thrown, if the output could not be opened due to an
+	 * I/O problem.
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		try {
+			establishConnection();
+			upload = dbConn.prepareStatement(query);
+		} catch (SQLException sqe) {
+			close();
+			throw new IllegalArgumentException("open() failed:\t!", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			close();
+			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
+		}
+	}
+
+	private void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(drivername);
+		if (username == null) {
+			dbConn = DriverManager.getConnection(dbURL);
+		} else {
+			dbConn = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+
+	private enum SupportedTypes {
+		BOOLEAN,
+		BYTE,
+		SHORT,
+		INTEGER,
+		LONG,
+		STRING,
+		FLOAT,
+		DOUBLE
+	}
+
+	/**
+	 * Adds a record to the prepared statement.
+	 * <p>
+	 * When this method is called, the output format is guaranteed to be opened.
+	 *
+	 * @param tuple The records to add to the output.
+	 * @throws IOException Thrown, if the records could not be added due to an I/O problem.
+	 */
+	@Override
+	public void writeRecord(OUT tuple) throws IOException {
+		try {
+			if (query.split("\\?,").length != tuple.getArity()) {
+				close();
+				throw new IOException("Tuple size does not match columncount");
+			}
+			if (types == null) {
+				extractTypes(tuple);
+			}
+			addValues(tuple);
+			upload.addBatch();
+			batchCount++;
+			if (batchCount >= batchInterval) {
+				upload.executeBatch();
+				batchCount = 0;
+			}
+		} catch (SQLException sqe) {
+			close();
+			throw new IllegalArgumentException("writeRecord() failed", sqe);
+		} catch (IllegalArgumentException iae) {
+			close();
+			throw new IllegalArgumentException("writeRecord() failed", iae);
+		}
+	}
+
+	private void extractTypes(OUT tuple) {
+		types = new SupportedTypes[tuple.getArity()];
+		for (int x = 0; x < tuple.getArity(); x++) {
+			types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
+		}
+	}
+
+	private void addValues(OUT tuple) throws SQLException {
+		for (int index = 0; index < tuple.getArity(); index++) {
+			switch (types[index]) {
+				case BOOLEAN:
+					upload.setBoolean(index + 1, (Boolean) tuple.getField(index));
+					break;
+				case BYTE:
+					upload.setByte(index + 1, (Byte) tuple.getField(index));
+					break;
+				case SHORT:
+					upload.setShort(index + 1, (Short) tuple.getField(index));
+					break;
+				case INTEGER:
+					upload.setInt(index + 1, (Integer) tuple.getField(index));
+					break;
+				case LONG:
+					upload.setLong(index + 1, (Long) tuple.getField(index));
+					break;
+				case STRING:
+					upload.setString(index + 1, (String) tuple.getField(index));
+					break;
+				case FLOAT:
+					upload.setFloat(index + 1, (Float) tuple.getField(index));
+					break;
+				case DOUBLE:
+					upload.setDouble(index + 1, (Double) tuple.getField(index));
+					break;
+			}
+		}
+	}
+
+	/**
+	 * Executes prepared statement and closes all resources of this instance.
+	 *
+	 * @throws IOException Thrown, if the input could not be closed properly.
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			upload.executeBatch();
+			batchCount = 0;
+		} catch (SQLException se) {
+			throw new IllegalArgumentException("close() failed", se);
+		} catch (NullPointerException se) {
+		}
+		try {
+			upload.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} catch (NullPointerException npe) {
+		}
+		try {
+			dbConn.close();
+		} catch (SQLException se) {
+			LOG.info("Inputformat couldn't be closed - " + se.getMessage());
+		} catch (NullPointerException npe) {
+		}
+	}
+
+	public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+		return new JDBCOutputFormatBuilder();
+	}
+
+	public static class JDBCOutputFormatBuilder {
+		private final JDBCOutputFormat format;
+
+		protected JDBCOutputFormatBuilder() {
+			this.format = new JDBCOutputFormat();
+		}
+
+		public JDBCOutputFormatBuilder setUsername(String username) {
+			format.username = username;
+			return this;
+		}
+
+		public JDBCOutputFormatBuilder setPassword(String password) {
+			format.password = password;
+			return this;
+		}
+
+		public JDBCOutputFormatBuilder setDrivername(String drivername) {
+			format.drivername = drivername;
+			return this;
+		}
+
+		public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+			format.dbURL = dbURL;
+			return this;
+		}
+
+		public JDBCOutputFormatBuilder setQuery(String query) {
+			format.query = query;
+			return this;
+		}
+
+		public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) {
+			format.batchInterval = batchInterval;
+			return this;
+		}
+
+		/**
+		Finalizes the configuration and checks validity.
+		@return Configured JDBCOutputFormat
+		 */
+		public JDBCOutputFormat finish() {
+			if (format.username == null) {
+				LOG.info("Username was not supplied separately.");
+			}
+			if (format.password == null) {
+				LOG.info("Password was not supplied separately.");
+			}
+			if (format.dbURL == null) {
+				throw new IllegalArgumentException("No dababase URL supplied.");
+			}
+			if (format.query == null) {
+				throw new IllegalArgumentException("No query suplied");
+			}
+			if (format.drivername == null) {
+				throw new IllegalArgumentException("No driver supplied");
+			}
+			return format;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
new file mode 100644
index 0000000..7d0c5e8
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.example;
+
+import static org.apache.flink.api.java.typeutils.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.java.typeutils.BasicTypeInfo.INT_TYPE_INFO;
+import static org.apache.flink.api.java.typeutils.BasicTypeInfo.STRING_TYPE_INFO;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public class JDBCExample {
+
+	public static void main(String[] args) throws Exception {
+		prepareTestDb();
+
+		ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5> source
+				= environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
+						.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+						.setDBUrl("jdbc:derby:memory:ebookshop")
+						.setQuery("select * from books")
+						.finish(),
+						new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO)
+				);
+
+		source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
+				.finish());
+		environment.execute();
+	}
+
+	private static void prepareTestDb() throws Exception {
+		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+		Connection conn = DriverManager.getConnection(dbURL);
+
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+
+		conn.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
new file mode 100644
index 0000000..f2930f2
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormat.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.java.record.io.GenericInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.ShortValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * InputFormat to read data from a database and generate PactReords.
+ * The InputFormat has to be configured with the query, and either all
+ * connection parameters or a complete database URL.{@link Configuration} The position of a value inside a Record is
+ * determined by the table
+ * returned.
+ * 
+ * @see Configuration
+ * @see Record
+ * @see DriverManager
+ */
+public class JDBCInputFormat extends GenericInputFormat implements NonParallelInput {
+
+	private static final long serialVersionUID = 1L;
+	
+	@SuppressWarnings("unused")
+	private static final Log LOG = LogFactory.getLog(JDBCInputFormat.class);
+	
+
+	public final String DRIVER_KEY = "driver";
+	public final String USERNAME_KEY = "username";
+	public final String PASSWORD_KEY = "password";
+	public final String URL_KEY = "url";
+	public final String QUERY_KEY = "query";
+
+
+	private String username;
+	private String password;
+	private String driverName;
+	private String dbURL;
+	private String query;
+
+	
+	private transient Connection dbConn;
+	private transient Statement statement;
+	private transient ResultSet resultSet;
+
+
+	/**
+	 * Creates a non-configured JDBCInputFormat. This format has to be
+	 * configured using configure(configuration).
+	 */
+	public JDBCInputFormat() {}
+
+	/**
+	 * Creates a JDBCInputFormat and configures it.
+	 * 
+	 * @param driverName
+	 *        JDBC-Drivename
+	 * @param dbURL
+	 *        Formatted URL containing all connection parameters.
+	 * @param username
+	 * @param password
+	 * @param query
+	 *        Query to execute.
+	 */
+	public JDBCInputFormat(String driverName, String dbURL, String username, String password, String query) {
+		this.driverName = driverName;
+		this.query = query;
+		this.dbURL = dbURL;
+		this.username = username;
+		this.password = password;
+	}
+
+	/**
+	 * Creates a JDBCInputFormat and configures it.
+	 * 
+	 * @param driverName
+	 *        JDBC-Drivername
+	 * @param dbURL
+	 *        Formatted URL containing all connection parameters.
+	 * @param query
+	 *        Query to execute.
+	 */
+	public JDBCInputFormat(String driverName, String dbURL, String query) {
+		this(driverName, dbURL, "", "", query);
+	}
+
+	/**
+	 * Creates a JDBCInputFormat and configures it.
+	 * 
+	 * @param parameters
+	 *        Configuration with all connection parameters.
+	 * @param query
+	 *        Query to execute.
+	 */
+	public JDBCInputFormat(Configuration parameters, String query) {
+		this.driverName = parameters.getString(DRIVER_KEY, "");
+		this.username = parameters.getString(USERNAME_KEY, "");
+		this.password = parameters.getString(PASSWORD_KEY, "");
+		this.dbURL = parameters.getString(URL_KEY, "");
+		this.query = query;
+	}
+
+	
+	/**
+	 * Configures this JDBCInputFormat. This includes setting the connection
+	 * parameters (if necessary), establishing the connection and executing the
+	 * query.
+	 * 
+	 * @param parameters
+	 *        Configuration containing all or no parameters.
+	 */
+	@Override
+	public void configure(Configuration parameters) {
+		boolean needConfigure = isFieldNullOrEmpty(this.query) || isFieldNullOrEmpty(this.dbURL);
+		if (needConfigure) {
+			this.driverName = parameters.getString(DRIVER_KEY, null);
+			this.username = parameters.getString(USERNAME_KEY, null);
+			this.password = parameters.getString(PASSWORD_KEY, null);
+			this.query = parameters.getString(QUERY_KEY, null);
+			this.dbURL = parameters.getString(URL_KEY, null);
+		}
+
+		try {
+			prepareQueryExecution();
+		} catch (SQLException e) {
+			throw new IllegalArgumentException("Configure failed:\t!", e);
+		}
+	}
+
+	/**
+	 * Enters data value from the current resultSet into a Record.
+	 * 
+	 * @param pos
+	 *        Record position to be set.
+	 * @param type
+	 *        SQL type of the resultSet value.
+	 * @param record
+	 *        Target Record.
+	 */
+	private void retrieveTypeAndFillRecord(int pos, int type, Record record) throws SQLException,
+			NotTransformableSQLFieldException {
+		switch (type) {
+		case java.sql.Types.NULL:
+			record.setField(pos, NullValue.getInstance());
+			break;
+		case java.sql.Types.BOOLEAN:
+			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
+			break;
+		case java.sql.Types.BIT:
+			record.setField(pos, new BooleanValue(resultSet.getBoolean(pos + 1)));
+			break;
+		case java.sql.Types.CHAR:
+			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
+			break;
+		case java.sql.Types.NCHAR:
+			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
+			break;
+		case java.sql.Types.VARCHAR:
+			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
+			break;
+		case java.sql.Types.LONGVARCHAR:
+			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
+			break;
+		case java.sql.Types.LONGNVARCHAR:
+			record.setField(pos, new StringValue(resultSet.getString(pos + 1)));
+			break;
+		case java.sql.Types.TINYINT:
+			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
+			break;
+		case java.sql.Types.SMALLINT:
+			record.setField(pos, new ShortValue(resultSet.getShort(pos + 1)));
+			break;
+		case java.sql.Types.BIGINT:
+			record.setField(pos, new LongValue(resultSet.getLong(pos + 1)));
+			break;
+		case java.sql.Types.INTEGER:
+			record.setField(pos, new IntValue(resultSet.getInt(pos + 1)));
+			break;
+		case java.sql.Types.FLOAT:
+			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
+			break;
+		case java.sql.Types.REAL:
+			record.setField(pos, new FloatValue(resultSet.getFloat(pos + 1)));
+			break;
+		case java.sql.Types.DOUBLE:
+			record.setField(pos, new DoubleValue(resultSet.getDouble(pos + 1)));
+			break;
+		case java.sql.Types.DECIMAL:
+			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
+			break;
+		case java.sql.Types.NUMERIC:
+			record.setField(pos, new DoubleValue(resultSet.getBigDecimal(pos + 1).doubleValue()));
+			break;
+		case java.sql.Types.DATE:
+			record.setField(pos, new StringValue(resultSet.getDate(pos + 1).toString()));
+			break;
+		case java.sql.Types.TIME:
+			record.setField(pos, new LongValue(resultSet.getTime(pos + 1).getTime()));
+			break;
+		case java.sql.Types.TIMESTAMP:
+			record.setField(pos, new StringValue(resultSet.getTimestamp(pos + 1).toString()));
+			break;
+		case java.sql.Types.SQLXML:
+			record.setField(pos, new StringValue(resultSet.getSQLXML(pos + 1).toString()));
+			break;
+		default:
+			throw new NotTransformableSQLFieldException("Unknown sql-type [" + type + "]on column [" + pos + "]");
+
+			// case java.sql.Types.BINARY:
+			// case java.sql.Types.VARBINARY:
+			// case java.sql.Types.LONGVARBINARY:
+			// case java.sql.Types.ARRAY:
+			// case java.sql.Types.JAVA_OBJECT:
+			// case java.sql.Types.BLOB:
+			// case java.sql.Types.CLOB:
+			// case java.sql.Types.NCLOB:
+			// case java.sql.Types.DATALINK:
+			// case java.sql.Types.DISTINCT:
+			// case java.sql.Types.OTHER:
+			// case java.sql.Types.REF:
+			// case java.sql.Types.ROWID:
+			// case java.sql.Types.STRUCT:
+		}
+	}
+
+	private boolean isFieldNullOrEmpty(String field) {
+		return (field == null || field.length() == 0);
+	}
+
+	private void prepareQueryExecution() throws SQLException {
+		setClassForDBType();
+		prepareCredentialsAndExecute();
+	}
+
+	/**
+	 * Loads appropriate JDBC driver.
+	 * 
+	 * @param dbType
+	 *        Type of the database.
+	 * @return boolean value, indication whether an appropriate driver could be
+	 *         found.
+	 */
+	private void setClassForDBType() {
+		try {
+			Class.forName(driverName);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC-Class not found:\t" + cnfe.getLocalizedMessage());
+		}
+	}
+
+	private void prepareCredentialsAndExecute() throws SQLException {
+		if (isFieldNullOrEmpty(username)) {
+			prepareConnection(dbURL);
+		} else {
+			prepareConnection();
+		}
+		executeQuery();
+	}
+
+	/**
+	 * Establishes a connection to a database.
+	 * 
+	 * @param dbURL
+	 *        Assembled URL containing all connection parameters.
+	 * @return boolean value, indicating whether a connection could be
+	 *         established
+	 */
+	private void prepareConnection(String dbURL) throws SQLException {
+		dbConn = DriverManager.getConnection(dbURL);
+	}
+
+	/**
+	 * Assembles the Database URL and establishes a connection.
+	 * 
+	 * @param dbType
+	 *        Type of the database.
+	 * @param username
+	 *        Login username.
+	 * @param password
+	 *        Login password.
+	 * @return boolean value, indicating whether a connection could be
+	 *         established
+	 */
+	private void prepareConnection() throws SQLException {
+		dbConn = DriverManager.getConnection(dbURL, username, password);
+	}
+
+	private void executeQuery() throws SQLException {
+		statement = dbConn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+		resultSet = statement.executeQuery(this.query);
+	}
+
+	/**
+	 * Checks whether all data has been read.
+	 * 
+	 * @return boolean value indication whether all data has been read.
+	 */
+	@Override
+	public boolean reachedEnd() {
+		try {
+			if (resultSet.isLast()) {
+				resultSet.close();
+				statement.close();
+				dbConn.close();
+				return true;
+			} else {
+				return false;
+			}
+		} catch (SQLException e) {
+			throw new IllegalArgumentException("Couldn't evaluate reachedEnd():\t" + e.getMessage());
+		} catch (NullPointerException e) {
+			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
+		}
+	}
+
+	/**
+	 * Stores the next resultSet row in a Record
+	 * 
+	 * @param record
+	 *        target Record
+	 * @return boolean value indicating that the operation was successful
+	 */
+	@Override
+	public Record nextRecord(Record record) {
+		try {
+			resultSet.next();
+			ResultSetMetaData rsmd = resultSet.getMetaData();
+			int column_count = rsmd.getColumnCount();
+			record.setNumFields(column_count);
+
+			for (int pos = 0; pos < column_count; pos++) {
+				int type = rsmd.getColumnType(pos + 1);
+				retrieveTypeAndFillRecord(pos, type, record);
+			}
+			return record;
+		} catch (SQLException e) {
+			throw new IllegalArgumentException("Couldn't read data:\t" + e.getMessage());
+		} catch (NotTransformableSQLFieldException e) {
+			throw new IllegalArgumentException("Couldn't read data because of unknown column sql-type:\t"
+				+ e.getMessage());
+		} catch (NullPointerException e) {
+			throw new IllegalArgumentException("Couldn't access resultSet:\t" + e.getMessage());
+		}
+	}
+	
+	public static class NotTransformableSQLFieldException extends Exception {
+
+		private static final long serialVersionUID = 1L;
+
+		public NotTransformableSQLFieldException(String message) {
+			super(message);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..a99b38e
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.ShortValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+public class JDBCOutputFormat implements OutputFormat<Record> {
+	private static final long serialVersionUID = 1L;
+
+	private static final int DEFAULT_BATCH_INTERVERAL = 5000;
+	
+	public static final String DRIVER_KEY = "driver";
+	public static final String USERNAME_KEY = "username";
+	public static final String PASSWORD_KEY = "password";
+	public static final String URL_KEY = "url";
+	public static final String QUERY_KEY = "query";
+	public static final String FIELD_COUNT_KEY = "fields";
+	public static final String FIELD_TYPE_KEY = "type";
+	public static final String BATCH_INTERVAL = "batchInt";
+
+	private Connection dbConn;
+	private PreparedStatement upload;
+
+	private String username;
+	private String password;
+	private String driverName;
+	private String dbURL;
+
+	private String query;
+	private int fieldCount;
+	private Class<? extends Value>[] fieldClasses;
+	
+	/**
+	 * Variable indicating the current number of insert sets in a batch.
+	 */
+	private int batchCount = 0;
+	
+	/**
+	 * Commit interval of batches.
+	 * High batch interval: faster inserts, more memory required (reduce if OutOfMemoryExceptions occur)
+	 * low batch interval: slower inserts, less memory.
+	 */
+	private int batchInterval = DEFAULT_BATCH_INTERVERAL;
+	
+
+	/**
+	 * Configures this JDBCOutputFormat.
+	 * 
+	 * @param parameters
+	 *        Configuration containing all parameters.
+	 */
+	@Override
+	public void configure(Configuration parameters) {
+		this.driverName = parameters.getString(DRIVER_KEY, null);
+		this.username = parameters.getString(USERNAME_KEY, null);
+		this.password = parameters.getString(PASSWORD_KEY, null);
+		this.dbURL = parameters.getString(URL_KEY, null);
+		this.query = parameters.getString(QUERY_KEY, null);
+		this.fieldCount = parameters.getInteger(FIELD_COUNT_KEY, 0);
+		this.batchInterval = parameters.getInteger(BATCH_INTERVAL, DEFAULT_BATCH_INTERVERAL);
+
+		@SuppressWarnings("unchecked")
+		Class<Value>[] classes = new Class[this.fieldCount];
+		this.fieldClasses = classes;
+
+		for (int i = 0; i < this.fieldCount; i++) {
+			@SuppressWarnings("unchecked")
+			Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_KEY + i, null);
+			if (clazz == null) {
+				throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
+						+ "No type class for parameter " + i);
+			}
+			this.fieldClasses[i] = clazz;
+		}
+	}
+
+	/**
+	 * Connects to the target database and initializes the prepared statement.
+	 *
+	 * @param taskNumber The number of the parallel instance.
+	 * @throws IOException Thrown, if the output could not be opened due to an
+	 * I/O problem.
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		try {
+			establishConnection();
+			upload = dbConn.prepareStatement(query);
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("open() failed:\t!", sqe);
+		} catch (ClassNotFoundException cnfe) {
+			throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe);
+		}
+	}
+
+	private void establishConnection() throws SQLException, ClassNotFoundException {
+		Class.forName(driverName);
+		if (username == null) {
+			dbConn = DriverManager.getConnection(dbURL);
+		} else {
+			dbConn = DriverManager.getConnection(dbURL, username, password);
+		}
+	}
+
+	/**
+	 * Adds a record to the prepared statement.
+	 * <p>
+	 * When this method is called, the output format is guaranteed to be opened.
+	 *
+	 * @param record The records to add to the output.
+	 * @throws IOException Thrown, if the records could not be added due to an
+	 * I/O problem.
+	 */
+	
+	@Override
+	public void writeRecord(Record record) throws IOException {
+		try {
+			for (int x = 0; x < record.getNumFields(); x++) {
+				Value temp = record.getField(x, fieldClasses[x]);
+				addValue(x + 1, temp);
+			}
+			upload.addBatch();
+			batchCount++;
+			if(batchCount >= batchInterval) {
+				upload.executeBatch();
+				batchCount = 0;
+			}
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("writeRecord() failed:\t", sqe);
+		} catch (IllegalArgumentException iae) {
+			throw new IllegalArgumentException("writeRecord() failed:\t", iae);
+		}
+	}
+
+	private enum pactType {
+		BooleanValue,
+		ByteValue,
+		CharValue,
+		DoubleValue,
+		FloatValue,
+		IntValue,
+		LongValue,
+		ShortValue,
+		StringValue
+	}
+
+	private void addValue(int index, Value value) throws SQLException {
+		pactType type;
+		try {
+			type = pactType.valueOf(value.getClass().getSimpleName());
+		} catch (IllegalArgumentException iae) {
+			throw new IllegalArgumentException("PactType not supported:\t", iae);
+		}
+		switch (type) {
+			case BooleanValue:
+				upload.setBoolean(index, ((BooleanValue) value).getValue());
+				break;
+			case ByteValue:
+				upload.setByte(index, ((ByteValue) value).getValue());
+				break;
+			case CharValue:
+				upload.setString(index, String.valueOf(((CharValue) value).getValue()));
+				break;
+			case DoubleValue:
+				upload.setDouble(index, ((DoubleValue) value).getValue());
+				break;
+			case FloatValue:
+				upload.setFloat(index, ((FloatValue) value).getValue());
+				break;
+			case IntValue:
+				upload.setInt(index, ((IntValue) value).getValue());
+				break;
+			case LongValue:
+				upload.setLong(index, ((LongValue) value).getValue());
+				break;
+			case ShortValue:
+				upload.setShort(index, ((ShortValue) value).getValue());
+				break;
+			case StringValue:
+				upload.setString(index, ((StringValue) value).getValue());
+				break;
+		}
+	}
+
+	/**
+	 * Executes prepared statement and closes all resources of this instance.
+	 *
+	 * @throws IOException Thrown, if the input could not be closed properly.
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			upload.executeBatch();
+			batchCount = 0;
+			upload.close();
+			dbConn.close();
+		} catch (SQLException sqe) {
+			throw new IllegalArgumentException("close() failed:\t", sqe);
+		}
+	}
+
+	/**
+	 * Creates a configuration builder that can be used to set the 
+	 * output format's parameters to the config in a fluent fashion.
+	 * 
+	 * @return A config builder for setting parameters.
+	 */
+	public static ConfigBuilder configureOutputFormat(GenericDataSink target) {
+		return new ConfigBuilder(target.getParameters());
+	}
+
+	/**
+	 * Abstract builder used to set parameters to the output format's 
+	 * configuration in a fluent way.
+	 */
+	protected static abstract class AbstractConfigBuilder<T>
+			extends FileOutputFormat.AbstractConfigBuilder<T> {
+
+		/**
+		 * Creates a new builder for the given configuration.
+		 * 
+		 * @param config The configuration into which the parameters will be written.
+		 */
+		protected AbstractConfigBuilder(Configuration config) {
+			super(config);
+		}
+
+		/**
+		 * Sets the query field.
+		 * @param value value to be set.
+		 * @return The builder itself.
+		 */
+		public T setQuery(String value) {
+			this.config.setString(QUERY_KEY, value);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+
+		/**
+		 * Sets the url field.
+		 * @param value value to be set.
+		 * @return The builder itself.
+		 */
+		public T setUrl(String value) {
+			this.config.setString(URL_KEY, value);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+
+		/**
+		 * Sets the username field.
+		 * @param value value to be set.
+		 * @return The builder itself.
+		 */
+		public T setUsername(String value) {
+			this.config.setString(USERNAME_KEY, value);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+
+		/**
+		 * Sets the password field.
+		 * @param value value to be set.
+		 * @return The builder itself.
+		 */
+		public T setPassword(String value) {
+			this.config.setString(PASSWORD_KEY, value);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+
+		/**
+		 * Sets the driver field.
+		 * @param value value to be set.
+		 * @return The builder itself.
+		 */
+		public T setDriver(String value) {
+			this.config.setString(DRIVER_KEY, value);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+
+		/**
+		 * Sets the type of a column.
+		 * Types are applied in the order they were set.
+		 * @param type PactType to apply.
+		 * @return The builder itself.
+		 */
+		public T setClass(Class<? extends Value> type) {
+			final int numYet = this.config.getInteger(FIELD_COUNT_KEY, 0);
+			this.config.setClass(FIELD_TYPE_KEY + numYet, type);
+			this.config.setInteger(FIELD_COUNT_KEY, numYet + 1);
+			@SuppressWarnings("unchecked")
+			T ret = (T) this;
+			return ret;
+		}
+	}
+
+	/**
+	 * A builder used to set parameters to the output format's configuration in a fluent way.
+	 */
+	public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
+		/**
+		 * Creates a new builder for the given configuration.
+		 * 
+		 * @param targetConfig The configuration into which the parameters will be written.
+		 */
+		protected ConfigBuilder(Configuration targetConfig) {
+			super(targetConfig);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
new file mode 100644
index 0000000..fcd0606
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/example/JDBCExample.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.record.io.jdbc.example;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.record.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.StringValue;
+
+/**
+ * Stand-alone example for the JDBC connector.
+ *
+ * NOTE: To run this example, you need the apache derby code in your classpath.
+ * See the Maven file (pom.xml) for a reference to the derby dependency. You can
+ * simply Change the scope of the Maven dependency from test to compile.
+ */
+public class JDBCExample implements Program, ProgramDescription {
+
+	@Override
+	public Plan getPlan(String[] args) {
+		/*
+		 * In this example we use the constructor where the url contains all the settings that are needed.
+		 * You could also use the default constructor and deliver a Configuration with all the needed settings.
+		 * You also could set the settings to the source-instance.
+		 */
+		GenericDataSource<JDBCInputFormat> source = new GenericDataSource<JDBCInputFormat>(
+				new JDBCInputFormat(
+						"org.apache.derby.jdbc.EmbeddedDriver",
+						"jdbc:derby:memory:ebookshop",
+						"select * from books"),
+				"Data Source");
+
+		GenericDataSink sink = new GenericDataSink(new JDBCOutputFormat(), "Data Output");
+		JDBCOutputFormat.configureOutputFormat(sink)
+				.setDriver("org.apache.derby.jdbc.EmbeddedDriver")
+				.setUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)")
+				.setClass(IntValue.class)
+				.setClass(StringValue.class)
+				.setClass(StringValue.class)
+				.setClass(FloatValue.class)
+				.setClass(IntValue.class);
+
+		sink.addInput(source);
+		return new Plan(sink, "JDBC Example Job");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Parameter:";
+	}
+
+	/*
+	 * To run this example, you need the apache derby code in your classpath!
+	 */
+	public static void main(String[] args) throws Exception {
+
+		prepareTestDb();
+		JDBCExample tut = new JDBCExample();
+		JobExecutionResult res = LocalExecutor.execute(tut, args);
+		System.out.println("runtime: " + res.getNetRuntime());
+
+		System.exit(0);
+	}
+
+	private static void prepareTestDb() throws Exception {
+		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+		Connection conn = DriverManager.getConnection(dbURL);
+
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+		
+		conn.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..5816fa8
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCInputFormatTest {
+	JDBCInputFormat jdbcInputFormat;
+
+	static Connection conn;
+
+	static final Object[][] dbData = {
+		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
+		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
+		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
+
+	@BeforeClass
+	public static void setUpClass() {
+		try {
+			prepareDerbyDatabase();
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
+		System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+		conn = DriverManager.getConnection(dbURL);
+		createTable();
+		insertDataToSQLTable();
+		conn.close();
+	}
+
+	private static void createTable() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	private static void insertDataToSQLTable() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		Statement stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	@AfterClass
+	public static void tearDownClass() {
+		cleanUpDerbyDatabases();
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		try {
+			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+			conn = DriverManager.getConnection(dbURL);
+			Statement stat = conn.createStatement();
+			stat.executeUpdate("DROP TABLE books");
+			stat.close();
+			conn.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	@After
+	public void tearDown() {
+		jdbcInputFormat = null;
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidDriver() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("select * from books")
+				.finish();
+		jdbcInputFormat.open(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidURL() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+				.setQuery("select * from books")
+				.finish();
+		jdbcInputFormat.open(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidQuery() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("iamnotsql")
+				.finish();
+		jdbcInputFormat.open(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompleteConfiguration() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setQuery("select * from books")
+				.finish();
+	}
+
+	@Test(expected = IOException.class)
+	public void testIncompatibleTuple() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("select * from books")
+				.finish();
+		jdbcInputFormat.open(null);
+		jdbcInputFormat.nextRecord(new Tuple2());
+	}
+
+	@Test
+	public void testJDBCInputFormat() throws IOException {
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("select * from books")
+				.finish();
+		jdbcInputFormat.open(null);
+		Tuple5 tuple = new Tuple5();
+		int recordCount = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(tuple);
+			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
+			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
+			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
+			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
+			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
+
+			for (int x = 0; x < 5; x++) {
+				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
+			}
+			recordCount++;
+		}
+		Assert.assertEquals(5, recordCount);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..c1c899e
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest {
+	private JDBCInputFormat jdbcInputFormat;
+	private JDBCOutputFormat jdbcOutputFormat;
+
+	private static Connection conn;
+
+	static final Object[][] dbData = {
+		{1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
+		{1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+		{1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33},
+		{1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+		{1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
+
+	@BeforeClass
+	public static void setUpClass() throws SQLException {
+		try {
+			System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+			prepareDerbyDatabase();
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException {
+		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+		Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+		conn = DriverManager.getConnection(dbURL);
+		createTable("books");
+		createTable("newbooks");
+		insertDataToSQLTables();
+		conn.close();
+	}
+
+	private static void createTable(String tableName) throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE ");
+		sqlQueryBuilder.append(tableName);
+		sqlQueryBuilder.append(" (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	private static void insertDataToSQLTables() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		Statement stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	@AfterClass
+	public static void tearDownClass() {
+		cleanUpDerbyDatabases();
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		try {
+			String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+			conn = DriverManager.getConnection(dbURL);
+			Statement stat = conn.createStatement();
+			stat.executeUpdate("DROP TABLE books");
+			stat.executeUpdate("DROP TABLE newbooks");
+			stat.close();
+			conn.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	@After
+	public void tearDown() {
+		jdbcOutputFormat = null;
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidDriver() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.idontexist")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidURL() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidQuery() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("iamnotsql")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompleteConfiguration() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+	}
+
+	@Test(expected = IOException.class)
+	public void testIncompatibleTuple() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		Tuple3 tuple3 = new Tuple3();
+		tuple3.setField(4, 0);
+		tuple3.setField("hi", 1);
+		tuple3.setField(4.4, 2);
+
+		jdbcOutputFormat.writeRecord(tuple3);
+		jdbcOutputFormat.close();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testIncompatibleTypes() throws IOException {
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+				.setDBUrl("jdbc:derby:memory:ebookshop")
+				.setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		Tuple5 tuple5 = new Tuple5();
+		tuple5.setField(4, 0);
+		tuple5.setField("hello", 1);
+		tuple5.setField("world", 2);
+		tuple5.setField(0.99, 3);
+		tuple5.setField("imthewrongtype", 4);
+
+		jdbcOutputFormat.writeRecord(tuple5);
+		jdbcOutputFormat.close();
+	}
+
+	@Test
+	public void testJDBCOutputFormat() throws IOException {
+		String sourceTable = "books";
+		String targetTable = "newbooks";
+		String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
+		String dbUrl = "jdbc:derby:memory:ebookshop";
+
+		jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+				.setDBUrl(dbUrl)
+				.setDrivername(driverPath)
+				.setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)")
+				.finish();
+		jdbcOutputFormat.open(0, 1);
+
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(driverPath)
+				.setDBUrl(dbUrl)
+				.setQuery("select * from " + sourceTable)
+				.finish();
+		jdbcInputFormat.open(null);
+
+		Tuple5 tuple = new Tuple5();
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(tuple);
+			jdbcOutputFormat.writeRecord(tuple);
+		}
+
+		jdbcOutputFormat.close();
+		jdbcInputFormat.close();
+
+		jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+				.setDrivername(driverPath)
+				.setDBUrl(dbUrl)
+				.setQuery("select * from " + targetTable)
+				.finish();
+		jdbcInputFormat.open(null);
+
+		int recordCount = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(tuple);
+			Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass());
+			Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass());
+			Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass());
+			Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass());
+			Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass());
+
+			for (int x = 0; x < 5; x++) {
+				Assert.assertEquals(dbData[recordCount][x], tuple.getField(x));
+			}
+
+			recordCount++;
+		}
+		Assert.assertEquals(5, recordCount);
+
+		jdbcInputFormat.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
new file mode 100644
index 0000000..3032728
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/DevNullLogStream.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.io.OutputStream;
+
+public class DevNullLogStream {
+
+	public static final OutputStream DEV_NULL = new OutputStream() {
+		public void write(int b) {}
+	};
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..1bafb42
--- /dev/null
+++ b/flink-addons/flink-jdbc/src/test/java/org/apache/flink/api/java/record/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ * 
+* http://www.apache.org/licenses/LICENSE-2.0
+ * 
+* Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.flink.api.java.record.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.java.record.io.jdbc.JDBCInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCInputFormatTest {
+	JDBCInputFormat jdbcInputFormat;
+	Configuration config;
+	static Connection conn;
+	static final Value[][] dbData = {
+		{new IntValue(1001), new StringValue("Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(11.11), new IntValue(11)},
+		{new IntValue(1002), new StringValue("More Java for dummies"), new StringValue("Tan Ah Teck"), new DoubleValue(22.22), new IntValue(22)},
+		{new IntValue(1003), new StringValue("More Java for more dummies"), new StringValue("Mohammad Ali"), new DoubleValue(33.33), new IntValue(33)},
+		{new IntValue(1004), new StringValue("A Cup of Java"), new StringValue("Kumar"), new DoubleValue(44.44), new IntValue(44)},
+		{new IntValue(1005), new StringValue("A Teaspoon of Java"), new StringValue("Kevin Jones"), new DoubleValue(55.55), new IntValue(55)}};
+
+	@BeforeClass
+	public static void setUpClass() {
+		try {
+			prepareDerbyDatabase();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void prepareDerbyDatabase() throws ClassNotFoundException {
+		System.setProperty("derby.stream.error.field","org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+		String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+		createConnection(dbURL);
+	}
+
+	private static void cleanUpDerbyDatabases() {
+		try {
+				String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+				Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+				conn = DriverManager.getConnection(dbURL);
+				Statement stat = conn.createStatement();
+				stat.executeUpdate("DROP TABLE books");
+				stat.close();
+				conn.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail();
+			} 
+	}
+	
+	/*
+	 Loads JDBC derby driver ; creates(if necessary) and populates database.
+	 */
+	private static void createConnection(String dbURL) {
+		try {
+			Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+			conn = DriverManager.getConnection(dbURL);
+			createTable();
+			insertDataToSQLTables();
+			conn.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+
+	private static void createTable() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+		sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		Statement stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("CREATE TABLE bookscontent (");
+		sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+		sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+		sqlQueryBuilder.append("content BLOB(10K) DEFAULT NULL,");
+		sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+		stat = conn.createStatement();
+		stat.executeUpdate(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+	private static void insertDataToSQLTables() throws SQLException {
+		StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)");
+
+		Statement stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+
+		sqlQueryBuilder = new StringBuilder("INSERT INTO bookscontent (id, title, content) VALUES ");
+		sqlQueryBuilder.append("(1001, 'Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
+		sqlQueryBuilder.append("(1002, 'More Java for dummies', CAST(X'7f454c4602' AS BLOB)),");
+		sqlQueryBuilder.append("(1003, 'More Java for more dummies', CAST(X'7f454c4602' AS BLOB)),");
+		sqlQueryBuilder.append("(1004, 'A Cup of Java', CAST(X'7f454c4602' AS BLOB)),");
+		sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', CAST(X'7f454c4602' AS BLOB))");
+
+		stat = conn.createStatement();
+		stat.execute(sqlQueryBuilder.toString());
+		stat.close();
+	}
+
+
+	@After
+	public void tearDown() {
+		jdbcInputFormat = null;
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidConnection() {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:idontexist", "select * from books;");
+		jdbcInputFormat.configure(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidQuery() {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "abc");
+		jdbcInputFormat.configure(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testInvalidDBType() {
+		jdbcInputFormat = new JDBCInputFormat("idontexist.Driver", "jdbc:derby:memory:ebookshop", "select * from books;");
+		jdbcInputFormat.configure(null);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testUnsupportedSQLType() {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from bookscontent");
+		jdbcInputFormat.configure(null);
+		jdbcInputFormat.nextRecord(new Record());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testNotConfiguredFormatNext() {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
+		jdbcInputFormat.nextRecord(new Record());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testNotConfiguredFormatEnd() {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
+		jdbcInputFormat.reachedEnd();
+	}
+
+	@Test
+	public void testJDBCInputFormat() throws IOException {
+		jdbcInputFormat = new JDBCInputFormat("org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:ebookshop", "select * from books");
+		jdbcInputFormat.configure(null);
+		Record record = new Record();
+		int recordCount = 0;
+		while (!jdbcInputFormat.reachedEnd()) {
+			jdbcInputFormat.nextRecord(record);
+			Assert.assertEquals(5, record.getNumFields());
+			Assert.assertEquals("Field 0 should be int", IntValue.class, record.getField(0, IntValue.class).getClass());
+			Assert.assertEquals("Field 1 should be String", StringValue.class, record.getField(1, StringValue.class).getClass());
+			Assert.assertEquals("Field 2 should be String", StringValue.class, record.getField(2, StringValue.class).getClass());
+			Assert.assertEquals("Field 3 should be float", DoubleValue.class, record.getField(3, DoubleValue.class).getClass());
+			Assert.assertEquals("Field 4 should be int", IntValue.class, record.getField(4, IntValue.class).getClass());
+
+			int[] pos = {0, 1, 2, 3, 4};
+			Value[] values = {new IntValue(), new StringValue(), new StringValue(), new DoubleValue(), new IntValue()};
+			Assert.assertTrue(record.equalsFields(pos, dbData[recordCount], values));
+
+			recordCount++;
+		}
+		Assert.assertEquals(5, recordCount);
+		
+		cleanUpDerbyDatabases();
+	}
+
+}


Mime
View raw message