flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] flink git commit: [FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API
Date Mon, 09 Feb 2015 14:38:26 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
deleted file mode 100644
index 6ef0f2e..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class HadoopIOFormatsITCase extends JavaProgramTestBase {
-
-	private static int NUM_PROGRAMS = 2;
-
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String[] resultPath;
-	private String[] expectedResult;
-	private String sequenceFileInPath;
-	private String sequenceFileInPathNull;
-
-	public HadoopIOFormatsITCase(Configuration config) {
-		super(config);	
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
-
-		File sequenceFile = createAndRegisterTempFile("seqFile");
-		sequenceFileInPath = sequenceFile.toURI().toString();
-
-		// Create a sequence file
-		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
-		Path path = new Path(sequenceFile.getAbsolutePath());
-
-		//  ------------------ Long / Text Key Value pair: ------------
-		int kvCount = 4;
-
-		LongWritable key = new LongWritable();
-		Text value = new Text();
-		SequenceFile.Writer writer = null;
-		try {
-			writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
-			for (int i = 0; i < kvCount; i ++) {
-				if(i == 1) {
-					// write key = 0 a bit more often.
-					for(int a = 0;a < 15; a++) {
-						key.set(i);
-						value.set(i+" - somestring");
-						writer.append(key, value);
-					}
-				}
-				key.set(i);
-				value.set(i+" - somestring");
-				writer.append(key, value);
-			}
-		} finally {
-			IOUtils.closeStream(writer);
-		}
-
-
-		//  ------------------ Long / Text Key Value pair: ------------
-
-		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
-		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
-		path = new Path(sequenceFileInPathNull);
-
-		LongWritable value1 = new LongWritable();
-		SequenceFile.Writer writer1 = null;
-		try {
-			writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
-			for (int i = 0; i < kvCount; i ++) {
-				value1.set(i);
-				writer1.append(NullWritable.get(), value1);
-			}
-		} finally {
-			IOUtils.closeStream(writer1);
-		}
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		for(int i = 0; i < resultPath.length; i++) {
-			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
-		}
-	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
-	}
-	
-	public static class HadoopIOFormatPrograms {
-		
-		public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/**
-				 * Test sequence file, including a key access.
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
-				JobConf hdconf = new JobConf();
-				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
-				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
-				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
-				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
-					@Override
-					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
-						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
-					}
-				}).sum(0);
-				sumed.writeAsText(resultPath[0]);
-				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
-					@Override
-					public String map(Tuple2<LongWritable, Text> value) throws Exception {
-						return value.f1 + " - " + value.f0.get();
-					}
-				});
-				res.writeAsText(resultPath[1]);
-				env.execute();
-				
-				// return expected result
-				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
-						"1 - somestring - 1\n" +
-						"2 - somestring - 2\n" +
-						"3 - somestring - 3\n"};
-
-			}
-			case 2: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
-				JobConf hdconf = new JobConf();
-				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
-				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
-				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
-				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
-					@Override
-					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
-						return new Tuple2<Void, Long>(null, value.f1.get());
-					}
-				});
-				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
-				res1.writeAsText(resultPath[1]);
-				res.writeAsText(resultPath[0]);
-				env.execute();
-
-				// return expected result
-				return 	new String [] {"(null,2)\n" +
-						"(null,0)\n" +
-						"(null,1)\n" +
-						"(null,3)",
-						"(null,0)\n" +
-						"(null,1)\n" +
-						"(null,2)\n" +
-						"(null,3)"};
-			}
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index cc5664b..f345591 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -152,6 +152,12 @@ under the License.
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-core</artifactId>
+					<!--<exclusions>-->
+						<!--<exclusion>-->
+							<!--<groupId>*</groupId>-->
+							<!--<artifactId>*</artifactId>-->
+						<!--</exclusion>-->
+					<!--</exclusions>-->
 				</dependency>
 			</dependencies>
 		</profile>
@@ -167,6 +173,22 @@ under the License.
 				<dependency>
 					<groupId>org.apache.hadoop</groupId>
 					<artifactId>hadoop-common</artifactId>
+					<!--<exclusions>-->
+						<!--<exclusion>-->
+							<!--<groupId>*</groupId>-->
+							<!--<artifactId>*</artifactId>-->
+						<!--</exclusion>-->
+					<!--</exclusions>-->
+				</dependency>
+				<dependency>
+					<groupId>org.apache.hadoop</groupId>
+					<artifactId>hadoop-mapreduce-client-core</artifactId>
+					<!--<exclusions>-->
+						<!--<exclusion>-->
+							<!--<groupId>*</groupId>-->
+							<!--<artifactId>*</artifactId>-->
+						<!--</exclusion>-->
+					<!--</exclusions>-->
 				</dependency>
 			</dependencies>
 		</profile>

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6415570..81caa2a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1265,8 +1265,7 @@ public abstract class DataSet<T> {
 		this.context.registerDataSink(sink);
 		return sink;
 	}
-	
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Utilities
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 563787f..61a74b9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.io.IteratorInputFormat;
@@ -58,6 +59,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
 import org.apache.flink.util.SplittableIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  * The ExecutionEnviroment is the context in which a program is executed. A
@@ -400,6 +403,67 @@ public abstract class ExecutionEnvironment {
 		
 		return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName());
 	}
+
+	// ----------------------------------- Hadoop Input Format ---------------------------------------
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The
+	 * given inputName is set on the given job.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
+
+		return result;
+	}
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
+	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
+	}
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+		return this.createInput(hadoopInputFormat);
+	}
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
+	 * given inputName is set on the given job.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
+		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
+
+		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
+				.hadoop.fs.Path(inputPath));
+
+		return result;
+	}
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
+	 * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
+		return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
+	}
+
+	/**
+	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
+	 */
+	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, Job job) {
+		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
+
+		return this.createInput(hadoopInputFormat);
+	}
 	
 	// ----------------------------------- Collection ---------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
new file mode 100644
index 0000000..8b25249
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> {
+	
+	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		super(mapredInputFormat, key, value, job);
+	}
+	
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		if(!hasNext) {
+			return null;
+		}
+		record.f0 = key;
+		record.f1 = value;
+		fetched = false;
+		return record;
+	}
+	
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
new file mode 100644
index 0000000..40f6631
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+
+public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
+
+	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
+	protected Class<K> keyClass;
+	protected Class<V> valueClass;
+	private JobConf jobConf;
+
+	protected transient K key;
+	protected transient V value;
+
+	private transient RecordReader<K, V> recordReader;
+	protected transient boolean fetched = false;
+	protected transient boolean hasNext;
+
+	public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+		super();
+		this.mapredInputFormat = mapredInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+		ReflectionUtils.setConf(mapredInputFormat, jobConf);
+	}
+	
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapredInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+		
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+		
+		try {
+			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
+			
+			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+		} catch (IOException ioex) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("Could not determine statistics due to an io error: "
+						+ ioex.getMessage());
+			}
+		} catch (Throwable t) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Unexpected problem while getting the file statistics: "
+						+ t.getMessage(), t);
+			}
+		}
+		
+		// no statistics available
+		return null;
+	}
+	
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
+		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
+		for(int i=0;i<splitArray.length;i++){
+			hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
+		}
+		return hiSplit;
+	}
+	
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+	
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+		if (this.recordReader instanceof Configurable) {
+			((Configurable) this.recordReader).setConf(jobConf);
+		}
+		key = this.recordReader.createKey();
+		value = this.recordReader.createValue();
+		this.fetched = false;
+	}
+	
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		return !hasNext;
+	}
+	
+	protected void fetchNext() throws IOException {
+		hasNext = this.recordReader.next(key, value);
+		fetched = true;
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+	
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
+			ArrayList<FileStatus> files) throws IOException {
+		
+		long latestModTime = 0L;
+		
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+			
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+			
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+			
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+				
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+		
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+		
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+		
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+		
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(mapredInputFormat.getClass().getName());
+		out.writeUTF(keyClass.getName());
+		out.writeUTF(valueClass.getName());
+		jobConf.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+		ReflectionUtils.setConf(mapredInputFormat, jobConf);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
new file mode 100644
index 0000000..75623e2
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.mapred.JobConf;
+
+public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
+
+	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
+		super(mapredOutputFormat, job);
+	}
+
+	@Override
+	public void writeRecord(Tuple2<K, V> record) throws IOException {
+		this.recordWriter.write(record.f0, record.f1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
new file mode 100644
index 0000000..a59b96f
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+
+public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster {
+
+	private static final long serialVersionUID = 1L;
+
+	private JobConf jobConf;
+	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;
+	protected transient RecordWriter<K,V> recordWriter;
+	private transient FileOutputCommitter fileOutputCommitter;
+	private transient TaskAttemptContext context;
+	private transient JobContext jobContext;
+
+	public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
+		super();
+		this.mapredOutputFormat = mapredOutputFormat;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+	}
+
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  OutputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		if (Integer.toString(taskNumber + 1).length() > 6) {
+			throw new IOException("Task id too large.");
+		}
+
+		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+				+ Integer.toString(taskNumber + 1)
+				+ "_0");
+
+		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
+
+		try {
+			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		this.fileOutputCommitter = new FileOutputCommitter();
+
+		try {
+			this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		this.fileOutputCommitter.setupJob(jobContext);
+
+		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+	}
+
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		this.recordWriter.close(new HadoopDummyReporter());
+		
+		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+			this.fileOutputCommitter.commitTask(this.context);
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		try {
+			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
+			
+			// finalize HDFS output format
+			fileOutputCommitter.commitJob(jobContext);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(mapredOutputFormat.getClass().getName());
+		jobConf.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopOutputFormatName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+		ReflectionUtils.setConf(mapredOutputFormat, jobConf);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
new file mode 100644
index 0000000..d4dc297
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.utils;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class HadoopUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class);
+
+	/**
+	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
+	 */
+	public static void mergeHadoopConf(JobConf jobConf) {
+		org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration();
+		for (Map.Entry<String, String> e : hadoopConf) {
+			jobConf.set(e.getKey(), e.getValue());
+		}
+	}
+	
+	public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
+		try {
+			// for Hadoop 1.xx
+			Class<?> clazz = null;
+			if(!TaskAttemptContext.class.isInterface()) { 
+				clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
+			// for Hadoop 1.xx
+			constructor.setAccessible(true);
+			JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of JobContext.", e);
+		}
+	}
+	
+	public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf,  TaskAttemptID taskAttemptID) throws Exception {
+		try {
+			// for Hadoop 1.xx
+			Class<?> clazz = null;
+			if(!TaskAttemptContext.class.isInterface()) { 
+				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
+			// for Hadoop 1.xx
+			constructor.setAccessible(true);
+			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of TaskAttemptContext.", e);
+		}
+	}
+
+	/**
+	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
+	 * in the main configuration (flink-conf.yaml).
+	 * This method is public because its being used in the HadoopDataSource.
+	 */
+	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+		Configuration retConf = new org.apache.hadoop.conf.Configuration();
+
+		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
+		// the hdfs configuration
+		// Try to load HDFS configuration from Hadoop's own configuration files
+		// 1. approach: Flink configuration
+		final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants
+				.HDFS_DEFAULT_CONFIG, null);
+		if (hdfsDefaultPath != null) {
+			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
+		} else {
+			LOG.debug("Cannot find hdfs-default configuration file");
+		}
+
+		final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		if (hdfsSitePath != null) {
+			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
+		} else {
+			LOG.debug("Cannot find hdfs-site configuration file");
+		}
+
+		// 2. Approach environment variables
+		String[] possibleHadoopConfPaths = new String[4];
+		possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
+
+		if (System.getenv("HADOOP_HOME") != null) {
+			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
+			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+		}
+
+		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+			if (possibleHadoopConfPath != null) {
+				if (new File(possibleHadoopConfPath).exists()) {
+					if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
+						}
+					}
+					if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+						retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
+						}
+					}
+				}
+			}
+		}
+		return retConf;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
new file mode 100644
index 0000000..215b890
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This is a dummy progress
+ *
+ */
+public class HadoopDummyProgressable implements Progressable {
+	@Override
+	public void progress() { 
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
new file mode 100644
index 0000000..01104ac
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This is a dummy progress monitor / reporter
+ *
+ */
+public class HadoopDummyReporter implements Reporter {
+
+	@Override
+	public void progress() {
+	}
+
+	@Override
+	public void setStatus(String status) {
+
+	}
+
+	@Override
+	public Counter getCounter(Enum<?> name) {
+		return null;
+	}
+
+	@Override
+	public Counter getCounter(String group, String name) {
+		return null;
+	}
+
+	@Override
+	public void incrCounter(Enum<?> key, long amount) {
+
+	}
+
+	@Override
+	public void incrCounter(String group, String counter, long amount) {
+
+	}
+
+	@Override
+	public InputSplit getInputSplit() throws UnsupportedOperationException {
+		return null;
+	}
+	// There should be an @Override, but some CDH4 dependency does not contain this method
+	public float getProgress() {
+		return 0;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..beef5d7
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+
+	private static final long serialVersionUID = 1L;
+
+	
+	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+	
+	private JobConf jobConf;
+	
+	private int splitNumber;
+	private String hadoopInputSplitTypeName;
+
+
+	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+		return hadoopInputSplit;
+	}
+
+	public HadoopInputSplit() {
+		super();
+	}
+
+	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+
+		this.splitNumber = splitNumber;
+		this.hadoopInputSplit = hInputSplit;
+		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
+		this.jobConf = jobconf;
+
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(splitNumber);
+		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
+		hadoopInputSplit.write(out);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber = in.readInt();
+		this.hadoopInputSplitTypeName = in.readUTF();
+		if(hadoopInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
+		this.hadoopInputSplit.readFields(in);
+
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(splitNumber);
+		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
+		hadoopInputSplit.write(out);
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		this.splitNumber=in.readInt();
+		this.hadoopInputSplitTypeName = in.readUTF();
+		if(hadoopInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
+		this.hadoopInputSplit.readFields(in);
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+
+	@Override
+	public String[] getHostnames() {
+		try {
+			return this.hadoopInputSplit.getLocations();
+		} catch(IOException ioe) {
+			return new String[0];
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
new file mode 100644
index 0000000..efe97f1
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K,V>> {
+
+	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+		super(mapreduceInputFormat, key, value, job);
+	}
+
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		if(!this.hasNext) {
+			return null;
+		}
+		try {
+			record.f0 = recordReader.getCurrentKey();
+			record.f1 = recordReader.getCurrentValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get KeyValue pair.", e);
+		}
+		this.fetched = false;
+
+		return record;
+	}
+
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
new file mode 100644
index 0000000..2a6c0f4
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
+
+	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
+	protected Class<K> keyClass;
+	protected Class<V> valueClass;
+	private org.apache.hadoop.conf.Configuration configuration;
+
+	protected transient RecordReader<K, V> recordReader;
+	protected boolean fetched = false;
+	protected boolean hasNext;
+
+	public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+		super();
+		this.mapreduceInputFormat = mapreduceInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+				
+				try {
+					final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
+					return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+				} catch (IOException ioex) {
+					if (LOG.isWarnEnabled()) {
+						LOG.warn("Could not determine statistics due to an io error: " 
+								+ ioex.getMessage());
+					}
+				} catch (Throwable t) {
+					if (LOG.isErrorEnabled()) {
+						LOG.error("Unexpected problem while getting the file statistics: "
+								+ t.getMessage(), t);
+					}
+				}
+				
+				// no statistics available
+				return null;
+	}
+
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		List<org.apache.hadoop.mapreduce.InputSplit> splits;
+		try {
+			splits = this.mapreduceInputFormat.getSplits(jobContext);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get Splits.", e);
+		}
+		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+		for(int i = 0; i < hadoopInputSplits.length; i++){
+			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+		}
+		return hadoopInputSplits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		TaskAttemptContext context = null;
+		try {
+			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		try {
+			this.recordReader = this.mapreduceInputFormat
+					.createRecordReader(split.getHadoopInputSplit(), context);
+			this.recordReader.initialize(split.getHadoopInputSplit(), context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordReader.", e);
+		} finally {
+			this.fetched = false;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		return !this.hasNext;
+	}
+
+	protected void fetchNext() throws IOException {
+		try {
+			this.hasNext = this.recordReader.nextKeyValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not fetch next KeyValue pair.", e);
+		} finally {
+			this.fetched = true;
+		}
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, 
+											ArrayList<FileStatus> files) throws IOException {
+
+		long latestModTime = 0L;
+
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
+		out.writeUTF(this.keyClass.getName());
+		out.writeUTF(this.valueClass.getName());
+		this.configuration.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+
+		try {
+			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
new file mode 100644
index 0000000..7d3675c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.mapreduce.Job;
+
+public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {
+	
+	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
+		super(mapreduceOutputFormat, job);
+	}
+	
+	@Override
+	public void writeRecord(Tuple2<K, V> record) throws IOException {
+		try {
+			this.recordWriter.write(record.f0, record.f1);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not write Record.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
new file mode 100644
index 0000000..a7ae428
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+
+public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster {
+
+	private static final long serialVersionUID = 1L;
+
+	private org.apache.hadoop.conf.Configuration configuration;
+	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
+	protected transient RecordWriter<K,V> recordWriter;
+	private transient FileOutputCommitter fileOutputCommitter;
+	private transient TaskAttemptContext context;
+	private transient int taskNumber;
+
+	public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) {
+		super();
+		this.mapreduceOutputFormat = mapreduceOutputFormat;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  OutputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		if (Integer.toString(taskNumber + 1).length() > 6) {
+			throw new IOException("Task id too large.");
+		}
+
+		this.taskNumber = taskNumber+1;
+
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.output.basename", "tmp");
+
+		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0")
+				+ Integer.toString(taskNumber + 1)
+				+ "_0");
+
+		this.configuration.set("mapred.task.id", taskAttemptID.toString());
+		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+
+		try {
+			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
+
+		try {
+			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+
+		try {
+			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordWriter.", e);
+		}
+	}
+
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws java.io.IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			this.recordWriter.close(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not close RecordReader.", e);
+		}
+		
+		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+			this.fileOutputCommitter.commitTask(this.context);
+		}
+		
+		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+		
+		// rename tmp-file to final name
+		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+		
+		String taskNumberStr = Integer.toString(this.taskNumber);
+		String tmpFileTemplate = "tmp-r-00000";
+		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
+		
+		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		JobContext jobContext;
+		TaskAttemptContext taskContext;
+		try {
+			
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
+					+ Integer.toString(1) 
+					+ "_0");
+			
+			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
+		
+		// finalize HDFS output format
+		this.fileOutputCommitter.commitJob(jobContext);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
+		this.configuration.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopOutputFormatClassName = in.readUTF();
+		
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+		
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+		
+		try {
+			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
new file mode 100644
index 0000000..fe8f8cc
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.hadoop.mapreduce.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HadoopUtils {
+	
+	/**
+	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
+	 */
+	public static void mergeHadoopConf(Configuration configuration) {
+		Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+		
+		for (Map.Entry<String, String> e : hadoopConf) {
+			configuration.set(e.getKey(), e.getValue());
+		}
+	}
+	
+	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
+			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of JobContext.");
+		}
+	}
+	
+	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
+			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of TaskAttemptContext.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..f2758b3
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+	
+	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
+	public transient JobContext jobContext;
+	
+	private int splitNumber;
+	
+	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+		return mapreduceInputSplit;
+	}
+	
+	
+	public HadoopInputSplit() {
+		super();
+	}
+	
+	
+	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
+		this.splitNumber = splitNumber;
+		if(!(mapreduceInputSplit instanceof Writable)) {
+			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
+		}
+		this.mapreduceInputSplit = mapreduceInputSplit;
+		this.jobContext = jobContext;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.splitNumber);
+		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+		Writable w = (Writable) this.mapreduceInputSplit;
+		w.write(out);
+	}
+	
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber = in.readInt();
+		String className = in.readUTF();
+		
+		if(this.mapreduceInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
+						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+			} catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		((Writable)this.mapreduceInputSplit).readFields(in);
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(this.splitNumber);
+		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+		Writable w = (Writable) this.mapreduceInputSplit;
+		w.write(out);
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		this.splitNumber=in.readInt();
+		String className = in.readUTF();
+
+		if(this.mapreduceInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+			} catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		((Writable)this.mapreduceInputSplit).readFields(in);
+	}
+	
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+	
+	@Override
+	public String[] getHostnames() {
+		try {
+			return this.mapreduceInputSplit.getLocations();
+		} catch (IOException e) {
+			return new String[0];
+		} catch (InterruptedException e) {
+			return new String[0];
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..89aa67e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapred;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class HadoopInputFormatTest {
+
+
+	public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
+
+		public DummyVoidKeyInputFormat() {
+		}
+
+		@Override
+		public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+			return null;
+		}
+	}
+	
+	
+	@Test
+	public void checkTypeInformation() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			// Set up the Hadoop Input Format
+			Job job = Job.getInstance();
+			HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf());
+
+			TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
+			TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+			
+			if(tupleType.isTupleType()) {
+				if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+					fail("Tuple type information was not set correctly!");
+				}
+			} else {
+				fail("Type information was not set to tuple type information!");
+			}
+
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+		}
+
+	}
+	
+}


Mime
View raw message