flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [3/3] flink git commit: [FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API
Date Mon, 09 Feb 2015 14:38:27 GMT
[FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API

This adds methods on ExecutionEnvironment for reading with Hadoop
Input/OutputFormat.

This also adds support in the Scala API for Hadoop Input/OutputFormats.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd2f88af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd2f88af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd2f88af

Branch: refs/heads/release-0.8
Commit: cd2f88afdad4c9eb2cd141eb3283d2e0084b2527
Parents: 944e2e3
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Jan 28 15:13:30 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 9 15:38:06 2015 +0100

----------------------------------------------------------------------
 docs/hadoop_compatibility.md                    | 112 ++++--
 .../mapred/HadoopInputFormat.java               | 298 ----------------
 .../mapred/HadoopMapFunction.java               |   2 +-
 .../mapred/HadoopOutputFormat.java              | 184 ----------
 .../mapred/HadoopReduceCombineFunction.java     |   2 +-
 .../mapred/HadoopReduceFunction.java            |   2 +-
 .../example/HadoopMapredCompatWordCount.java    |   4 +-
 .../mapred/record/HadoopRecordInputFormat.java  |   8 +-
 .../mapred/record/HadoopRecordOutputFormat.java |   6 +-
 .../mapred/utils/HadoopUtils.java               |  87 -----
 .../mapred/wrapper/HadoopDummyProgressable.java |  33 --
 .../mapred/wrapper/HadoopDummyReporter.java     |  70 ----
 .../mapred/wrapper/HadoopInputSplit.java        | 103 ------
 .../mapreduce/HadoopInputFormat.java            | 339 -------------------
 .../mapreduce/HadoopOutputFormat.java           | 227 -------------
 .../mapreduce/example/WordCount.java            |   4 +-
 .../mapreduce/utils/HadoopUtils.java            |  83 -----
 .../mapreduce/wrapper/HadoopInputSplit.java     |  90 -----
 .../mapred/HadoopIOFormatsITCase.java           | 230 -------------
 flink-java/pom.xml                              |  22 ++
 .../java/org/apache/flink/api/java/DataSet.java |   3 +-
 .../flink/api/java/ExecutionEnvironment.java    |  64 ++++
 .../java/hadoop/mapred/HadoopInputFormat.java   |  55 +++
 .../hadoop/mapred/HadoopInputFormatBase.java    | 253 ++++++++++++++
 .../java/hadoop/mapred/HadoopOutputFormat.java  |  37 ++
 .../hadoop/mapred/HadoopOutputFormatBase.java   | 165 +++++++++
 .../java/hadoop/mapred/utils/HadoopUtils.java   | 154 +++++++++
 .../mapred/wrapper/HadoopDummyProgressable.java |  33 ++
 .../mapred/wrapper/HadoopDummyReporter.java     |  70 ++++
 .../hadoop/mapred/wrapper/HadoopInputSplit.java | 138 ++++++++
 .../hadoop/mapreduce/HadoopInputFormat.java     |  60 ++++
 .../hadoop/mapreduce/HadoopInputFormatBase.java | 289 ++++++++++++++++
 .../hadoop/mapreduce/HadoopOutputFormat.java    |  41 +++
 .../mapreduce/HadoopOutputFormatBase.java       | 203 +++++++++++
 .../hadoop/mapreduce/utils/HadoopUtils.java     |  82 +++++
 .../mapreduce/wrapper/HadoopInputSplit.java     | 125 +++++++
 .../hadoop/mapred/HadoopInputFormatTest.java    |  82 +++++
 .../hadoop/mapreduce/HadoopInputFormatTest.java |  84 +++++
 .../flink/api/scala/ExecutionEnvironment.scala  |  97 +++++-
 .../scala/hadoop/mapred/HadoopInputFormat.scala |  41 +++
 .../hadoop/mapred/HadoopOutputFormat.scala      |  29 ++
 .../hadoop/mapreduce/HadoopInputFormat.scala    |  42 +++
 .../hadoop/mapreduce/HadoopOutputFormat.scala   |  30 ++
 flink-tests/pom.xml                             |  12 +-
 .../hadoop/mapred/WordCountMapredITCase.java    | 118 +++++++
 .../mapreduce/WordCountMapreduceITCase.java     | 118 +++++++
 .../hadoop/mapred/WordCountMapredITCase.scala   |  67 ++++
 .../mapreduce/WordCountMapreduceITCase.scala    |  70 ++++
 48 files changed, 2668 insertions(+), 1800 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/docs/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/hadoop_compatibility.md b/docs/hadoop_compatibility.md
index 9b43022..cacca0f 100644
--- a/docs/hadoop_compatibility.md
+++ b/docs/hadoop_compatibility.md
@@ -23,7 +23,8 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce.
+Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows
+reusing code that was implemented for Hadoop MapReduce.
 
 You can:
 
@@ -38,9 +39,19 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please
 
 ### Project Configuration
 
-The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs.
+Support for Haddop input/output formats is part of the `flink-java` and
+`flink-scala` Maven modules that are always required when writing Flink jobs.
+The code is located in `org.apache.flink.api.java.hadoop` and
+`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
+`mapred` and `mapreduce` API.
 
-Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer.
+Support for Hadoop Mappers and Reducers is contained in the `flink-staging`
+Maven module.
+This code resides in the `org.apache.flink.hadoopcompatibility`
+package.
+
+Add the following dependency to your `pom.xml` if you want to reuse Mappers
+and Reducers.
 
 ~~~xml
 <dependency>
@@ -52,56 +63,70 @@ Add the following dependency to your `pom.xml` to use the Hadoop Compatibility L
 
 ### Using Hadoop Data Types
 
-Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details.
+Flink supports all Hadoop `Writable` and `WritableComparable` data types
+out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
+if you only want to use your Hadoop data types. See the
+[Programming Guide](programming_guide.html#data-types) for more details.
 
 ### Using Hadoop InputFormats
 
-Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair.
-
-Flink's InputFormat wrappers are 
-
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and 
-- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat`
+Hadoop input formats can be used to create a data source by using
+one of the methods `readHadoopFile` or `createHadoopInput` of the
+`ExecutionEnvironment`. The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
 
-and can be used as regular Flink [InputFormats](programming_guide.html#data-sources).
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
 
 The following example shows how to use Hadoop's `TextInputFormat`.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 ~~~java
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-// Set up the Hadoop TextInputFormat.
-Job job = Job.getInstance();
-HadoopInputFormat<LongWritable, Text> hadoopIF = 
-  // create the Flink wrapper.
-  new HadoopInputFormat<LongWritable, Text>(
-    // create the Hadoop InputFormat, specify key and value type, and job.
-    new TextInputFormat(), LongWritable.class, Text.class, job
-  );
-TextInputFormat.addInputPath(job, new Path(inputPath));
-		
-// Read data using the Hadoop TextInputFormat.
-DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
 
 // Do something with the data.
 [...]
 ~~~
 
-### Using Hadoop OutputFormats
+</div>
+<div data-lang="scala" markdown="1">
 
-Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat.
+~~~scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+		
+val input: DataSet[(LongWritable, Text)] =
+  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
 
-Flink's OUtputFormat wrappers are
+// Do something with the data.
+[...]
+~~~
+
+</div>
 
-- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and 
-- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat`
+</div>
+
+### Using Hadoop OutputFormats
 
-and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks).
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.
 
 The following example shows how to use Hadoop's `TextOutputFormat`.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 ~~~java
-// Obtain your result to emit.
+// Obtain the result we want to emit
 DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
 		
 // Set up the Hadoop TextOutputFormat.
@@ -115,9 +140,32 @@ hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", "
 TextOutputFormat.setOutputPath(job, new Path(outputPath));
 		
 // Emit data using the Hadoop TextOutputFormat.
-result.output(hadoopOF);
+hadoopResult.output(hadoopOF);
 ~~~
 
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+		
+~~~
+
+</div>
+
+</div>
+
 ### Using Hadoop Mappers and Reducers
 
 Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
deleted file mode 100644
index 8dfda67..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
-	
-	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-	private JobConf jobConf;
-	
-	private transient K key;
-	private transient V value;
-	
-	private transient RecordReader<K, V> recordReader;
-	private transient boolean fetched = false;
-	private transient boolean hasNext;
-
-	public HadoopInputFormat() {
-		super();
-	}
-	
-	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		super();
-		this.mapredInputFormat = mapredInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-		ReflectionUtils.setConf(mapredInputFormat, jobConf);
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-	
-	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
-		return mapredInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) {
-		this.mapredInputFormat = mapredInputFormat;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if(!(mapredInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-		
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-		
-		try {
-			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
-			
-			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-		} catch (IOException ioex) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Could not determine statistics due to an io error: "
-						+ ioex.getMessage());
-			}
-		} catch (Throwable t) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Unexpected problem while getting the file statistics: "
-						+ t.getMessage(), t);
-			}
-		}
-		
-		// no statistics available
-		return null;
-	}
-	
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
-		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
-		for(int i=0;i<splitArray.length;i++){
-			hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
-		}
-		return hiSplit;
-	}
-	
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
-		return new DefaultInputSplitAssigner(inputSplits);
-	}
-	
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-		if (this.recordReader instanceof Configurable) {
-			((Configurable) this.recordReader).setConf(jobConf);
-		}
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		return !hasNext;
-	}
-	
-	private void fetchNext() throws IOException {
-		hasNext = this.recordReader.next(key, value);
-		fetched = true;
-	}
-	
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		if(!hasNext) {
-			return null;
-		}
-		record.f0 = key;
-		record.f1 = value;
-		fetched = false;
-		return record;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-	
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-			ArrayList<FileStatus> files) throws IOException {
-		
-		long latestModTime = 0L;
-		
-		// get the file info and check whether the cached statistics are still valid.
-		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-			
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-			
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-			
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-				
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-		
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-		
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-		
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-		
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(mapredInputFormat.getClass().getName());
-		out.writeUTF(keyClass.getName());
-		out.writeUTF(valueClass.getName());
-		jobConf.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-		ReflectionUtils.setConf(mapredInputFormat, jobConf);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  ResultTypeQueryable
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index 9bc36f3..ad94c01 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
deleted file mode 100644
index 64c539b..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private JobConf jobConf;
-	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;	
-	private transient RecordWriter<K,V> recordWriter;	
-	private transient FileOutputCommitter fileOutputCommitter;
-	private transient TaskAttemptContext context;
-	private transient JobContext jobContext;
-	
-	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) {
-		super();
-		this.mapredOutputFormat = mapredOutputFormat;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-	
-	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
-		return mapredOutputFormat;
-	}
-	
-	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) {
-		this.mapredOutputFormat = mapredOutputFormat;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-		
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-				+ Integer.toString(taskNumber + 1) 
-				+ "_0");
-		
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
-		
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		this.fileOutputCommitter = new FileOutputCommitter();
-		
-		try {
-			this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		this.fileOutputCommitter.setupJob(jobContext);
-		
-		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-	}
-	
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		this.recordWriter.write(record.f0, record.f1);
-	}
-	
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		this.recordWriter.close(new HadoopDummyReporter());
-		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
-		}
-	}
-	
-	@Override
-	public void finalizeGlobal(int parallelism) throws IOException {
-
-		try {
-			JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-			FileOutputCommitter fileOutputCommitter = new FileOutputCommitter();
-			
-			// finalize HDFS output format
-			fileOutputCommitter.commitJob(jobContext);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(mapredOutputFormat.getClass().getName());
-		jobConf.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopOutputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-		ReflectionUtils.setConf(mapredOutputFormat, jobConf);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index 5d83bad..3687bb2 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index 1f0aedd..439c31c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index de20fab..3547e47 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
 import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
index f8153a2..edcc43b 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
@@ -29,9 +29,9 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
 import org.apache.flink.types.Record;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -80,7 +80,7 @@ public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, Hadoop
 		org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits);
 		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
 		for(int i=0;i<splitArray.length;i++){
-			hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
+			hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
 		}
 		return hiSplit;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
index 74118a3..e519062 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
@@ -27,9 +27,9 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
 import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.types.Record;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
deleted file mode 100644
index 2d2f518..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-
-public class HadoopUtils {
-	
-	/**
-	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		for (Map.Entry<String, String> e : hadoopConf) {
-			jobConf.set(e.getKey(), e.getValue());
-		}
-	}
-	
-	public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
-		try {
-			// for Hadoop 1.xx
-			Class<?> clazz = null;
-			if(!TaskAttemptContext.class.isInterface()) { 
-				clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
-			// for Hadoop 1.xx
-			constructor.setAccessible(true);
-			JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of JobContext.", e);
-		}
-	}
-	
-	public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf,  TaskAttemptID taskAttemptID) throws Exception {
-		try {
-			// for Hadoop 1.xx
-			Class<?> clazz = null;
-			if(!TaskAttemptContext.class.isInterface()) { 
-				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
-			// for Hadoop 1.xx
-			constructor.setAccessible(true);
-			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of TaskAttemptContext.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
deleted file mode 100644
index 483dd2f..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress
- *
- */
-public class HadoopDummyProgressable implements Progressable {
-	@Override
-	public void progress() { 
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
deleted file mode 100644
index 84a1e9e..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter
- *
- */
-public class HadoopDummyReporter implements Reporter {
-
-	@Override
-	public void progress() {
-	}
-
-	@Override
-	public void setStatus(String status) {
-
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> name) {
-		return null;
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		return null;
-	}
-
-	@Override
-	public void incrCounter(Enum<?> key, long amount) {
-
-	}
-
-	@Override
-	public void incrCounter(String group, String counter, long amount) {
-
-	}
-
-	@Override
-	public InputSplit getInputSplit() throws UnsupportedOperationException {
-		return null;
-	}
-	// There should be an @Override, but some CDH4 dependency does not contain this method
-	public float getProgress() {
-		return 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
deleted file mode 100644
index cf36a9d..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-
-
-public class HadoopInputSplit implements InputSplit {
-
-	private static final long serialVersionUID = 1L;
-
-	
-	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
-	
-	private JobConf jobConf;
-	
-	private int splitNumber;
-	private String hadoopInputSplitTypeName;
-
-
-	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-		return hadoopInputSplit;
-	}
-
-	public HadoopInputSplit() {
-		super();
-	}
-
-	public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
-		this.hadoopInputSplit = hInputSplit;
-		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
-		this.jobConf = jobconf;
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(splitNumber);
-		out.writeUTF(hadoopInputSplitTypeName);
-		jobConf.write(out);
-		hadoopInputSplit.write(out);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber=in.readInt();
-		this.hadoopInputSplitTypeName = in.readUTF();
-		if(hadoopInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
-						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-		if (this.hadoopInputSplit instanceof Configurable) {
-			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
-		}
-		this.hadoopInputSplit.readFields(in);
-
-	}
-
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-
-	public void setSplitNumber(int splitNumber) {
-		this.splitNumber = splitNumber;
-	}
-
-	public void setHadoopInputSplit(
-			org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
-		this.hadoopInputSplit = hadoopInputSplit;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
deleted file mode 100644
index 280aaf9..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
-	
-	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-	private org.apache.hadoop.conf.Configuration configuration;
-	
-	private transient RecordReader<K, V> recordReader;
-	private boolean fetched = false;
-	private boolean hasNext;
-	
-	public HadoopInputFormat() {
-		super();
-	}
-	
-	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		super();
-		this.mapreduceInputFormat = mapreduceInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-	
-	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
-		this.configuration = configuration;
-	}
-	
-	public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
-		return this.mapreduceInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
-		this.mapreduceInputFormat = mapreduceInputFormat;
-	}
-	
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if(!(mapreduceInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-		
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-				
-				try {
-					final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
-					return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-				} catch (IOException ioex) {
-					if (LOG.isWarnEnabled()) {
-						LOG.warn("Could not determine statistics due to an io error: "
-								+ ioex.getMessage());
-					}
-				} catch (Throwable t) {
-					if (LOG.isErrorEnabled()) {
-						LOG.error("Unexpected problem while getting the file statistics: "
-								+ t.getMessage(), t);
-					}
-				}
-				
-				// no statistics available
-				return null;
-	}
-	
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-		
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		List<org.apache.hadoop.mapreduce.InputSplit> splits;
-		try {
-			splits = this.mapreduceInputFormat.getSplits(jobContext);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get Splits.", e);
-		}
-		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-		
-		for(int i = 0; i < hadoopInputSplits.length; i++){
-			hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext);
-		}
-		return hadoopInputSplits;
-	}
-	
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
-		return new DefaultInputSplitAssigner(inputSplits);
-	}
-	
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context = null;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		try {
-			this.recordReader = this.mapreduceInputFormat
-					.createRecordReader(split.getHadoopInputSplit(), context);
-			this.recordReader.initialize(split.getHadoopInputSplit(), context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordReader.", e);
-		} finally {
-			this.fetched = false;
-		}
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!this.fetched) {
-			fetchNext();
-		}
-		return !this.hasNext;
-	}
-	
-	private void fetchNext() throws IOException {
-		try {
-			this.hasNext = this.recordReader.nextKeyValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not fetch next KeyValue pair.", e);
-		} finally {
-			this.fetched = true;
-		}
-	}
-	
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if(!this.fetched) {
-			fetchNext();
-		}
-		if(!this.hasNext) {
-			return null;
-		}
-		try {
-			record.f0 = this.recordReader.getCurrentKey();
-			record.f1 = this.recordReader.getCurrentValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get KeyValue pair.", e);
-		}
-		this.fetched = false;
-		
-		return record;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-	
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-			ArrayList<FileStatus> files) throws IOException {
-		
-		long latestModTime = 0L;
-		
-		// get the file info and check whether the cached statistics are still valid.
-		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-			
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-			
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-			
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-				
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-		
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-		
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-		
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-		
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
-		out.writeUTF(this.keyClass.getName());
-		out.writeUTF(this.valueClass.getName());
-		this.configuration.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-		
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-		
-		if(this.configuration == null) {
-			this.configuration = configuration;
-		}
-		
-		try {
-			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  ResultTypeQueryable
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
deleted file mode 100644
index 402372c..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private org.apache.hadoop.conf.Configuration configuration;
-	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
-	private transient RecordWriter<K,V> recordWriter;
-	private transient FileOutputCommitter fileOutputCommitter;
-	private transient TaskAttemptContext context;
-	private transient int taskNumber;
-	
-	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
-		super();
-		this.mapreduceOutputFormat = mapreduceOutputFormat;
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-	
-	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
-		this.configuration = configuration;
-	}
-	
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-	
-	public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
-		return this.mapreduceOutputFormat;
-	}
-	
-	public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
-		this.mapreduceOutputFormat = mapreduceOutputFormat;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-		
-		this.taskNumber = taskNumber+1;
-		
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.output.basename", "tmp");
-		
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-				+ Integer.toString(taskNumber + 1) 
-				+ "_0");
-		
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
-		
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
-		
-		try {
-			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
-		
-		try {
-			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordWriter.", e);
-		}
-	}
-	
-	
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		try {
-			this.recordWriter.write(record.f0, record.f1);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not write Record.", e);
-		}
-	}
-	
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			this.recordWriter.close(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not close RecordReader.", e);
-		}
-		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
-		}
-		
-		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-		
-		// rename tmp-file to final name
-		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
-		
-		String taskNumberStr = Integer.toString(this.taskNumber);
-		String tmpFileTemplate = "tmp-r-00000";
-		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
-		
-		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
-			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
-		}
-	}
-	
-	@Override
-	public void finalizeGlobal(int parallelism) throws IOException {
-
-		JobContext jobContext;
-		TaskAttemptContext taskContext;
-		try {
-			
-			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
-					+ Integer.toString(1) 
-					+ "_0");
-			
-			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
-			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
-		
-		// finalize HDFS output format
-		this.fileOutputCommitter.commitJob(jobContext);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
-		this.configuration.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopOutputFormatClassName = in.readUTF();
-		
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-		
-		if(this.configuration == null) {
-			this.configuration = configuration;
-		}
-		
-		try {
-			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 2b99fd2..f5758eb 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
 
 /**
  * Implements a word count which takes the input file and counts the number of

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
deleted file mode 100644
index 86b730f..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-public class HadoopUtils {
-	
-	/**
-	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(Configuration configuration) {
-		Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-		
-		for (Map.Entry<String, String> e : hadoopConf) {
-			configuration.set(e.getKey(), e.getValue());
-		}
-	}
-	
-	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if(JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
-			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of JobContext.");
-		}
-	}
-	
-	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if(JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
-			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of TaskAttemptContext.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
deleted file mode 100644
index 25cd0d8..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapreduce.JobContext;
-
-
-public class HadoopInputSplit implements InputSplit {
-	
-	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
-	public transient JobContext jobContext;
-	
-	private int splitNumber;	
-	
-	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
-		return mapreduceInputSplit;
-	}
-	
-	
-	public HadoopInputSplit() {
-		super();
-	}
-	
-	
-	public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
-		if(!(mapreduceInputSplit instanceof Writable)) {
-			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
-		}
-		this.mapreduceInputSplit = mapreduceInputSplit;
-		this.jobContext = jobContext;
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.splitNumber);
-		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-		Writable w = (Writable) this.mapreduceInputSplit;
-		w.write(out);
-	}
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber=in.readInt();
-		String className = in.readUTF();
-		
-		if(this.mapreduceInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
-						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
-			} catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		((Writable)this.mapreduceInputSplit).readFields(in);
-	}
-	
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-	
-	public void setSplitNumber(int splitNumber) {
-		this.splitNumber = splitNumber;
-	}
-}


Mime
View raw message