flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/2] git commit: Added wrappers for Hadoop functions
Date Wed, 08 Oct 2014 09:49:42 GMT
Added wrappers for Hadoop functions

Tests and documentation included. Also tested on cluster.

I hijacked @twalthr's PR #131 to build the documentation of the Hadoop function wrappers on top.

Author: Fabian Hueske <fhueske@apache.org>
Author: Artem Tsikiridis <artem.tsikiridis@cern.ch>
Author: twalthr <info@twalthr.com>

Closes #143 from fhueske/hadoopFunctions and squashes the following commits:

f1b3f21 [Fabian Hueske] [FLINK-1076] Extended Hadoop Compatibility documentation to cover Hadoop functions
f4be4a0 [twalthr] [FLINK-1107] Hadoop Compatibility Layer documented
4a62bdb [Fabian Hueske] [FLINK-1076] Return type determined via type extraction. Added test cases. Minor improvements and clean-ups.
df947d9 [Artem Tsikiridis] [FLINK-1076] Extend Hadoop compatibility. Added wrappers for stand-alone Map, Reduce, and CombinableReduce functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/74dded1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/74dded1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/74dded1c

Branch: refs/heads/master
Commit: 74dded1c2cb87c459be3444d1c7387bbc510e154
Parents: 6be8555
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Oct 8 11:49:02 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Oct 8 11:49:02 2014 +0200

----------------------------------------------------------------------
 docs/_includes/sidenav.html                     |   1 +
 docs/hadoop_compatibility.md                    | 181 ++++++++++-
 docs/spargel_guide.md                           |   6 +-
 .../mapred/HadoopInputFormat.java               |   6 +-
 .../mapred/HadoopMapFunction.java               | 135 +++++++++
 .../mapred/HadoopReduceCombineFunction.java     | 166 +++++++++++
 .../mapred/HadoopReduceFunction.java            | 142 +++++++++
 .../example/HadoopMapredCompatWordCount.java    | 133 +++++++++
 .../mapred/example/WordCount.java               | 120 --------
 .../mapred/record/example/WordCount.java        |   4 +-
 .../mapred/wrapper/HadoopDummyProgressable.java |   4 +-
 .../mapred/wrapper/HadoopInputSplit.java        |  25 +-
 .../mapred/wrapper/HadoopOutputCollector.java   |  66 +++++
 .../wrapper/HadoopTupleUnwrappingIterator.java  |  96 ++++++
 .../mapred/HadoopInputOutputITCase.java         |  46 ---
 .../mapred/HadoopMapFunctionITCase.java         | 221 ++++++++++++++
 .../mapred/HadoopMapredITCase.java              |  47 +++
 .../HadoopReduceCombineFunctionITCase.java      | 297 +++++++++++++++++++
 .../mapred/HadoopReduceFunctionITCase.java      | 250 ++++++++++++++++
 .../mapred/HadoopTestData.java                  |  62 ++++
 .../HadoopTupleUnwrappingIteratorTest.java      | 137 +++++++++
 .../flink/test/util/JavaProgramTestBase.java    |   9 +
 22 files changed, 1968 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 7c22633..7d6fb20 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -27,6 +27,7 @@
       <li><a href="streaming_guide.html">Streaming Guide</a></li>
       <li><a href="iterations.html">Iterations</a></li>
       <li><a href="spargel_guide.html">Spargel Graph API</a></li>
+      <li><a href="hadoop_compatibility.html">Hadoop Compatibility</a></li>
     </ul>
   </li>
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/docs/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/hadoop_compatibility.md b/docs/hadoop_compatibility.md
index 06c0dfa..b9a0b39 100644
--- a/docs/hadoop_compatibility.md
+++ b/docs/hadoop_compatibility.md
@@ -1,5 +1,182 @@
 ---
-title: "Hadoop Compatability"
+title: "Hadoop Compatibility"
 ---
 
-To be written.
\ No newline at end of file
+* This will be replaced by the TOC
+{:toc}
+
+Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce.
+
+You can:
+
+- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in Flink programs.
+- use any Hadoop `InputFormat` as a [DataSource](programming_guide.html#data-sources).
+- use any Hadoop `OutputFormat` as a [DataSink](programming_guide.html#data-sinks).
+- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
+- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+This document shows how to use existing Hadoop MapReduce code with Flink.
+
+### Project Configuration
+
+The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs.
+
+Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer.
+
+~~~xml
+<dependency>
+	<groupId>org.apache.flink</groupId>
+	<artifactId>flink-hadoop-compatibility</artifactId>
+	<version>{{site.FLINK_VERSION_STABLE}}</version>
+</dependency>
+~~~
+
+### Using Hadoop Data Types
+
+Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details.
+
+### Using Hadoop InputFormats
+
+Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair.
+
+Flink's InputFormat wrappers are 
+
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and 
+- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat`
+
+and can be used as regular Flink [InputFormats](programming_guide.html#data-sources).
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF = 
+  // create the Flink wrapper.
+  new HadoopInputFormat<LongWritable, Text>(
+    // create the Hadoop InputFormat, specify key and value type, and job.
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+		
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+// Do something with the data.
+[...]
+~~~
+
+### Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat.
+
+Flink's OUtputFormat wrappers are
+
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and 
+- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat`
+
+and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks).
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+~~~java
+// Obtain your result to emit.
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+		
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF = 
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+		
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF)
+      .setParallelism(1);
+~~~
+
+**Please note:** At the moment, Hadoop OutputFormats must be executed with a parallelism of 1 (DOP = 1). This limitation will be resolved soon.
+
+### Using Hadoop Mappers and Reducers
+
+Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.
+
+The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure the Hadoop Mapper or Reducer.
+
+Flink's function wrappers are 
+
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
+
+and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap) or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
+
+~~~java
+// Obtain data to process somehow.
+DataSet<Tuple2<Text, LongWritable>> text = [...]
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+~~~
+
+**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`. 
+
+### Complete Hadoop WordCount Example
+
+The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations.
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF = 
+  new HadoopInputFormat<LongWritable, Text>(
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+		
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
+    new Counter(), new Counter()
+  ));
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF = 
+  new HadoopOutputFormat<Text, IntWritable>(
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+		
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF)
+      .setParallelism(1);
+
+// Execute Program
+env.execute("Hadoop WordCount");
+~~~

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/docs/spargel_guide.md
----------------------------------------------------------------------
diff --git a/docs/spargel_guide.md b/docs/spargel_guide.md
index 6fe2585..c5b2ac4 100644
--- a/docs/spargel_guide.md
+++ b/docs/spargel_guide.md
@@ -17,14 +17,14 @@ This vertex-centric view makes it easy to express a large class of graph problem
 Spargel API
 -----------
 
-The Spargel API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flinkspargel.java* package.
+The Spargel API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.spargel.java* package.
 
 Add the following dependency to your `pom.xml` to use the Spargel.
 
 ~~~xml
 <dependency>
 	<groupId>org.apache.flink</groupId>
-	<artifactId>spargel</artifactId>
+	<artifactId>flink-spargel</artifactId>
 	<version>{{site.FLINK_VERSION_STABLE}}</version>
 </dependency>
 ~~~
@@ -110,4 +110,4 @@ The computation **terminates** after a specified *maximum number of supersteps*
 
 <p class="text-center">
     <img alt="Spargel Example" width="75%" src="img/spargel_example.png" />
-</p>
\ No newline at end of file
+</p>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index 296304a..8dfda67 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -43,6 +43,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -66,7 +67,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
 	private transient RecordReader<K, V> recordReader;
 	private transient boolean fetched = false;
 	private transient boolean hasNext;
-	
+
 	public HadoopInputFormat() {
 		super();
 	}
@@ -155,6 +156,9 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement
 	@Override
 	public void open(HadoopInputSplit split) throws IOException {
 		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+		if (this.recordReader instanceof Configurable) {
+			((Configurable) this.recordReader).setConf(jobConf);
+		}
 		key = this.recordReader.createKey();
 		value = this.recordReader.createValue();
 		this.fetched = false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
new file mode 100644
index 0000000..9bc36f3
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, 
+										KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
+	private transient JobConf jobConf;
+
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
+		this(hadoopMapper, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * The Hadoop Mapper is configured with the provided JobConf.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
+		if(hadoopMapper == null) {
+			throw new NullPointerException("Mapper may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.mapper = hadoopMapper;
+		this.jobConf = conf;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.mapper.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+	}
+
+	@Override
+	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
+			throws Exception {
+		outputCollector.setFlinkCollector(out);
+		mapper.map(value.f0, value.f1, outputCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+	
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		out.writeObject(mapper.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
+				(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		mapper = InstantiationUtil.instantiate(mapperClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..5d83bad
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
+												KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+	private transient Reporter reporter;
+
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+		this(hadoopReducer, hadoopCombiner, new JobConf());
+	}
+	
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(hadoopCombiner == null) {
+			throw new NullPointerException("Combiner may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.combiner = hadoopCombiner;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		this.combiner.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
+		this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
+		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@Override
+	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+		combineCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		out.writeObject(combiner.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+		combiner = InstantiationUtil.instantiate(combinerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..1f0aedd
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.WritableTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
+										KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
+		this(hadoopReducer, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		jobConf.write(out);		
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
new file mode 100644
index 0000000..de20fab
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+public class HadoopMapredCompatWordCount {
+	
+	public static void main(String[] args) throws Exception {
+		if (args.length < 2) {
+			System.err.println("Usage: WordCount <input path> <result path>");
+			return;
+		}
+		
+		final String inputPath = args[0];
+		final String outputPath = args[1];
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// Set up the Hadoop Input Format
+		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
+		TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
+		
+		// Create a Flink job with it
+		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+		
+		DataSet<Tuple2<Text, LongWritable>> words = 
+				text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+					.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
+		
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
+		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
+		
+		// Output & Execute
+		words.output(hadoopOutputFormat).setParallelism(1);
+		env.execute("Hadoop Compat WordCount");
+	}
+	
+	
+	public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
+
+		@Override
+		public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) 
+				throws IOException {
+			// normalize and split the line
+			String line = v.toString();
+			String[] tokens = line.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Text(token), new LongWritable(1l));
+				}
+			}
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+		
+		@Override
+		public void close() throws IOException { }
+		
+	}
+	
+	public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
+
+		@Override
+		public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
+				throws IOException {
+			
+			long cnt = 0;
+			while(vs.hasNext()) {
+				cnt += vs.next().get();
+			}
+			out.collect(k, new LongWritable(cnt));
+			
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+		
+		@Override
+		public void close() throws IOException { }
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
deleted file mode 100644
index 6a8f56b..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.example;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
-import org.apache.flink.util.Collector;
-
-
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-	
-	public static void main(String[] args) throws Exception {
-		if (args.length < 2) {
-			System.err.println("Usage: WordCount <input path> <result path>");
-			return;
-		}
-		
-		final String inputPath = args[0];
-		final String outputPath = args[1];
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
-		
-		// Set up the Hadoop Input Format
-		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
-		TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
-		
-		// Create a Flink job with it
-		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-		
-		// Tokenize the line and convert from Writable "Text" to String for better handling
-		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-		
-		// Sum up the words
-		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-		
-		// Convert String back to Writable "Text" for use with Hadoop Output Format
-		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-		
-		// Set up Hadoop Output Format
-		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), new JobConf());
-		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
-		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
-		
-		// Output & Execute
-		hadoopResult.output(hadoopOutputFormat);
-		env.execute("Word Count");
-	}
-	
-	/**
-	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
-	 */
-	public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-		
-		@Override
-		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String line = value.f1.toString();
-			String[] tokens = line.toLowerCase().split("\\W+");
-			
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Converts Java data types to Hadoop Writables.
-	 */
-	public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-		
-		@Override
-		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
-			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
-		}
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
index 3a0a5a4..b167080 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -52,10 +52,10 @@ import org.apache.hadoop.mapred.TextInputFormat;
  * 
  * <br /><br />
  * 
- * <b>Note</b>: This example uses the out dated Record API.
+ * <b>Note</b>: This example uses the out-dated Record API.
  * It is recommended to use the new Java API.
  * 
- * @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
+ * @see org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount
  */
 public class WordCount implements Program, ProgramDescription {
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
index f42ca1c..483dd2f 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.util.Progressable;
  */
 public class HadoopDummyProgressable implements Progressable {
 	@Override
-	public void progress() {
-
+	public void progress() { 
+		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
index ec10477..cf36a9d 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -35,33 +36,31 @@ public class HadoopInputSplit implements InputSplit {
 	
 	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
 	
-	@SuppressWarnings("unused")
 	private JobConf jobConf;
 	
 	private int splitNumber;
 	private String hadoopInputSplitTypeName;
-	
-	
+
+
 	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
 		return hadoopInputSplit;
 	}
-	
-	
+
 	public HadoopInputSplit() {
 		super();
 	}
-	
-	
+
 	public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
 		this.hadoopInputSplit = hInputSplit;
 		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
 		this.jobConf = jobconf;
 	}
-	
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(splitNumber);
 		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
 		hadoopInputSplit.write(out);
 	}
 
@@ -71,7 +70,7 @@ public class HadoopInputSplit implements InputSplit {
 		this.hadoopInputSplitTypeName = in.readUTF();
 		if(hadoopInputSplit == null) {
 			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
 						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
 				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
 			}
@@ -79,7 +78,13 @@ public class HadoopInputSplit implements InputSplit {
 				throw new RuntimeException("Unable to create InputSplit", e);
 			}
 		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
 		this.hadoopInputSplit.readFields(in);
+
 	}
 
 	@Override
@@ -90,7 +95,7 @@ public class HadoopInputSplit implements InputSplit {
 	public void setSplitNumber(int splitNumber) {
 		this.splitNumber = splitNumber;
 	}
-	
+
 	public void setHadoopInputSplit(
 			org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
 		this.hadoopInputSplit = hadoopInputSplit;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..280708f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink collector.
+ * 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable>
+		implements OutputCollector<KEY,VALUE> {
+
+	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+	
+	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+	/**
+	 * Set the wrapped Flink collector.
+	 * 
+	 * @param flinkCollector The wrapped Flink OutputCollector.
+	 */
+	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
+		this.flinkCollector = flinkCollector;
+	}
+	
+	/**
+	 * Use the wrapped Flink collector to collect a key-value pair for Flink. 
+	 * 
+	 * @param key the key to collect
+	 * @param val the value to collect
+	 * @throws IOException unexpected of key or value in key-value pair.
+	 */
+	@Override
+	public void collect(final KEY key, final VALUE val) throws IOException {
+
+		this.outTuple.f0 = key;
+		this.outTuple.f1 = val;
+		this.flinkCollector.collect(outTuple);
+	}
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..83afe39
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
+ */
+@SuppressWarnings("rawtypes")
+public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable> 
+									extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private Iterator<Tuple2<KEY,VALUE>> iterator;
+	
+	private final WritableSerializer<KEY> keySerializer;
+	
+	private boolean atFirst = false;
+	private KEY curKey = null;
+	private VALUE firstValue = null;
+	
+	public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
+		this.keySerializer = new WritableSerializer<KEY>(keyClass);
+	}
+	
+	/**
+	* Set the Flink iterator to wrap.
+	* 
+	* @param iterator The Flink iterator to wrap.
+	*/
+	@Override()
+	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+		this.iterator = iterator;
+		if(this.hasNext()) {
+			final Tuple2<KEY, VALUE> tuple = iterator.next();
+			this.curKey = keySerializer.copy(tuple.f0);
+			this.firstValue = tuple.f1;
+			this.atFirst = true;
+		} else {
+			this.atFirst = false;
+		}
+	}
+	
+	@Override
+	public boolean hasNext() {
+		if(this.atFirst) {
+			return true;
+		}
+		return iterator.hasNext();
+	}
+	
+	@Override
+	public VALUE next() {
+		if(this.atFirst) {
+			this.atFirst = false;
+			return firstValue;
+		}
+		
+		final Tuple2<KEY, VALUE> tuple = iterator.next();
+		return tuple.f1;
+	}
+	
+	public KEY getCurrentKey() {
+		return this.curKey;
+	}
+	
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
deleted file mode 100644
index bf8459a..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-	
-	protected String textPath;
-	protected String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
new file mode 100644
index 0000000..6a9ad2c
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class HadoopMapFunctionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 3;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public HadoopMapFunctionITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = MapperProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class MapperProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test non-passing mapper
+				 */
+		
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
+				
+				nonPassingFlatMapDs.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"\n";
+			}
+			case 2: {
+				/*
+				 * Test data duplicating mapper
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
+				
+				duplicatingFlatMapDs.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(1,Hi)\n" + "(1,HI)\n" +
+				"(2,Hello)\n" + "(2,HELLO)\n" +
+				"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
+				"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
+				"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
+				"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
+				"(7,Comment#1)\n" + "(7,COMMENT#1)\n" + 
+				"(8,Comment#2)\n" + "(8,COMMENT#2)\n" + 
+				"(9,Comment#3)\n" + "(9,COMMENT#3)\n" + 
+				"(10,Comment#4)\n" + "(10,COMMENT#4)\n" + 
+				"(11,Comment#5)\n" + "(11,COMMENT#5)\n" + 
+				"(12,Comment#6)\n" + "(12,COMMENT#6)\n" + 
+				"(13,Comment#7)\n" + "(13,COMMENT#7)\n" + 
+				"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
+				"(15,Comment#9)\n" + "(15,COMMENT#9)\n" + 
+				"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
+				"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
+				"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
+				"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
+				"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
+				"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
+			}
+			case 3: {
+				// Mapper configured via JobConf
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				JobConf conf = new JobConf();
+				conf.set("my.filterPrefix", "Hello");
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> hellos = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
+				
+				hellos.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(2,Hello)\n" +
+				"(3,Hello world)\n" +
+				"(4,Hello world, how are you?)\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			if ( v.toString().contains("bananas") ) {
+				out.collect(k,v);
+			}
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			out.collect(k, v);
+			out.collect(k, new Text(v.toString().toUpperCase()));
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		private String filterPrefix;
+		
+		@Override
+		public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
+				throws IOException {
+			if(v.toString().startsWith(filterPrefix)) {
+				out.collect(k, v);
+			}
+		}
+		
+		@Override
+		public void configure(JobConf c) {
+			filterPrefix = c.get("my.filterPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
new file mode 100644
index 0000000..f8c3bab
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopMapredITCase extends JavaProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+		this.setDegreeOfParallelism(4);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/74dded1c/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
new file mode 100644
index 0000000..e2c124d
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceCombineFunctionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 4;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public HadoopReduceCombineFunctionITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Override
+	protected boolean skipCollectionExecution() {
+		if (this.curProgId == 3) {
+			return true;
+		}
+		return false;
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class ReducerProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test standard counting with combiner
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = new IntWritable(v.f0.get() / 6);
+								outT.f1 = new IntWritable(1);
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new SumReducer()));
+				
+				counts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,5)\n"+
+						"(1,6)\n" +
+						"(2,6)\n" +
+						"(3,4)\n";
+			}
+			case 2: {
+				/*
+				 * Test ungrouped Hadoop reducer
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = new IntWritable(0);
+								outT.f1 = v.f0;
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new SumReducer()));
+				
+				sum.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,231)\n";
+			}
+			case 3: {
+				/* Test combiner */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = v.f0;
+								outT.f1 = new IntWritable(1);
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new KeyChangingReducer()));
+				
+				counts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,5)\n"+
+						"(1,6)\n" +
+						"(2,5)\n" +
+						"(3,5)\n";
+			}
+			case 4: {
+				/*
+				 * Test configuration via JobConf
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				JobConf conf = new JobConf();
+				conf.set("my.cntPrefix", "Hello");
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
+							private static final long serialVersionUID = 1L;
+							@Override
+							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								v.f0 = new IntWritable(v.f0.get() % 5);
+								return v;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+								new ConfigurableCntReducer(), conf));
+				
+				hellos.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,0)\n"+
+						"(1,0)\n" +
+						"(2,1)\n" +
+						"(3,1)\n" +
+						"(4,1)\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			
+			int sum = 0;
+			while(v.hasNext()) {
+				sum += v.next().get();
+			}
+			out.collect(k, new IntWritable(sum));
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			while(v.hasNext()) {
+				out.collect(new IntWritable(k.get() % 4), v.next());
+			}
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		private String countPrefix;
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith(this.countPrefix)) {
+					commentCnt++;
+				}
+			}
+			out.collect(k, new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf c) { 
+			this.countPrefix = c.get("my.cntPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}


Mime
View raw message