flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/6] git commit: [FLINK-1076] Return type determined via type extraction. Added test cases. Minor improvements and clean-ups.
Date Wed, 08 Oct 2014 10:23:43 GMT
[FLINK-1076] Return type determined via type extraction.
Added test cases.
Minor improvements and clean-ups.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/aa6d1cf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/aa6d1cf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/aa6d1cf5

Branch: refs/heads/master
Commit: aa6d1cf564c815552e8d41f6c689e9ba9e5c0bed
Parents: 35415f2
Author: Fabian Hueske <fhueske@apache.org>
Authored: Mon Oct 6 10:18:51 2014 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Oct 8 12:22:27 2014 +0200

----------------------------------------------------------------------
 .../mapred/HadoopMapFunction.java               |  63 ++--
 .../mapred/HadoopOutputFormat.java              |   1 +
 .../mapred/HadoopReduceCombineFunction.java     | 159 ++++------
 .../mapred/HadoopReduceFunction.java            | 137 +++------
 .../example/HadoopMapredCompatWordCount.java    |   7 +-
 .../mapred/utils/HadoopConfiguration.java       |  52 ----
 .../mapred/utils/HadoopUtils.java               |   1 +
 .../mapred/wrapper/HadoopDummyProgressable.java |   4 +-
 .../mapred/wrapper/HadoopOutputCollector.java   |  13 +-
 .../wrapper/HadoopTupleUnwrappingIterator.java  |  96 ++++++
 .../hadoopcompatibility/HadoopTestBase.java     |  45 ---
 .../mapred/HadoopMapFunctionITCase.java         | 221 ++++++++++++++
 .../mapred/HadoopMapredITCase.java              |   2 +-
 .../HadoopReduceCombineFunctionITCase.java      | 297 +++++++++++++++++++
 .../mapred/HadoopReduceFunctionITCase.java      | 250 ++++++++++++++++
 .../mapred/HadoopTestData.java                  |  62 ++++
 .../HadoopTupleUnwrappingIteratorTest.java      | 137 +++++++++
 .../flink/test/util/JavaProgramTestBase.java    |   9 +
 18 files changed, 1222 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index 10f2f0e..9bc36f3 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -28,9 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.util.Collector;
@@ -42,11 +42,11 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * The wrapper for a Hadoop Mapper (mapred API). Parses a Hadoop JobConf object and initialises all operations related
- * mappers.
+ * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
  */
-public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
-										KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable> 
+@SuppressWarnings("rawtypes")
+public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, 
+										KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
 					extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
 					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
@@ -55,24 +55,34 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEI
 	private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
 	private transient JobConf jobConf;
 
-	private transient Class<KEYOUT> keyOutClass;
-	private transient Class<VALUEOUT> valOutClass;
-	
 	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector;
 	private transient Reporter reporter;
 	
-	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, 
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
-		
-		this(hadoopMapper, keyOutClass, valOutClass, new JobConf());
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) {
+		this(hadoopMapper, new JobConf());
 	}
 	
-	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, 
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
-		this.mapper = hadoopMapper;
-		this.keyOutClass = keyOutClass;
-		this.valOutClass = valOutClass;
+	/**
+	 * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
+	 * The Hadoop Mapper is configured with the provided JobConf.
+	 * 
+	 * @param hadoopMapper The Hadoop Mapper to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Mapper.
+	 */
+	public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) {
+		if(hadoopMapper == null) {
+			throw new NullPointerException("Mapper may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
 		
+		this.mapper = hadoopMapper;
 		this.jobConf = conf;
 	}
 
@@ -85,13 +95,6 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEI
 		this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
 	}
 
-	/**
-	 * Wrap a hadoop map() function call and use a Flink collector to collect the result values.
-	 * @param value The input value.
-	 * @param out The collector for emitting result values.
-	 *
-	 * @throws Exception
-	 */
 	@Override
 	public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
 			throws Exception {
@@ -99,10 +102,14 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEI
 		mapper.map(value.f0, value.f1, outputCollector, reporter);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
-		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {	
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
 		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
 	}
 	
@@ -112,7 +119,7 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable<?>, VALUEI
 	 */
 	private void writeObject(final ObjectOutputStream out) throws IOException {
 		out.writeObject(mapper.getClass());
-		HadoopConfiguration.writeHadoopJobConf(jobConf, out);
+		jobConf.write(out);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index ded47df..434b409 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
+
 public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
 	
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index dfb01b2..5d83bad 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -22,35 +22,33 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * The wrapper for a Hadoop Reducer (mapred API). Parses a Hadoop JobConf object and initialises all operations related
- * reducers and combiners.
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
  */
+@SuppressWarnings("rawtypes")
 @org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
-public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
-										KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable> 
+public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
+												KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
 					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
 					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
@@ -60,136 +58,83 @@ public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable<
 	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
 	private transient JobConf jobConf;
 	
-	private transient Class<KEYOUT> keyOutClass;
-	private transient Class<VALUEOUT> valOutClass;
-	
-	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
 	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
 	private transient Reporter reporter;
-	private transient ReducerTransformingIterator iterator;
 
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 */
 	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
-		this(hadoopReducer, hadoopCombiner, keyOutClass, valOutClass, new JobConf());
+										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+		this(hadoopReducer, hadoopCombiner, new JobConf());
 	}
 	
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
+	 */
 	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner,
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
+								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(hadoopCombiner == null) {
+			throw new NullPointerException("Combiner may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
 		this.reducer = hadoopReducer;
 		this.combiner = hadoopCombiner;
-
-		this.keyOutClass = keyOutClass;
-		this.valOutClass = valOutClass;
-		
-		this.jobConf = new JobConf();
+		this.jobConf = conf;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
+		this.reducer.configure(jobConf);
+		this.combiner.configure(jobConf);
 		
 		this.reporter = new HadoopDummyReporter();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
 		this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
 		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
-		this.iterator = new ReducerTransformingIterator();
 	}
 
-	/**
-	 * A wrapping iterator for an iterator of key-value pairs that can be used as an iterator of values.
-	 */
-	private final class ReducerTransformingIterator extends TupleUnwrappingIterator<VALUEIN,KEYIN>
-			implements java.io.Serializable {
-
-		private static final long serialVersionUID = 1L;
-		private Iterator<Tuple2<KEYIN,VALUEIN>> iterator;
-		private KEYIN key;
-		private Tuple2<KEYIN,VALUEIN> first;
-
-		/**
-		 * Set the iterator to wrap.
-		 * @param iterator iterator to wrap
-		 */
-		@Override()
-		public void set(final Iterator<Tuple2<KEYIN,VALUEIN>> iterator) {
-			this.iterator = iterator;
-			if(this.hasNext()) {
-				this.first = iterator.next();
-				this.key = this.first.f0;
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			if(this.first != null) {
-				return true;
-			}
-			return iterator.hasNext();
-		}
-
-		@Override
-		public VALUEIN next() {
-			if(this.first != null) {
-				final VALUEIN val = this.first.f1;
-				this.first = null;
-				return val;
-			}
-			final Tuple2<KEYIN,VALUEIN> tuple = iterator.next();
-			return tuple.f1;
-		}
-
-		private KEYIN getKey() {
-			return WritableUtils.clone(this.key, jobConf);
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	/**
-	 * Wrap a hadoop reduce() function call and use a Flink collector to collect the result values.
-	 * @param values The iterator returning the group of values to be reduced.
-	 * @param out The collector to emit the returned values.
-	 *
-	 * @throws Exception
-	 */
 	@Override
 	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
 			throws Exception {
 		reduceCollector.setFlinkCollector(out);
-		iterator.set(values.iterator());
-		reducer.reduce(iterator.getKey(), iterator, reduceCollector, reporter);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
 	}
 
-	/**
-	 * Wrap a hadoop combine() function call and use a Flink collector to collect the result values.
-	 * @param values The iterator returning the group of values to be reduced.
-	 * @param out The collector to emit the returned values.
-	 *
-	 * @throws Exception
-	 */
 	@Override
 	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
-		if (this.combiner == null) {
-			throw new RuntimeException("No combiner has been specified in Hadoop job. Flink reduce function is" +
-					"declared combinable. Invalid behaviour.");  //This should not happen.
-		}
-		else {
-			combineCollector.setFlinkCollector(out);
-			iterator.set(values.iterator());
-			combiner.reduce(iterator.getKey(), iterator, combineCollector, reporter);
-		}
+		combineCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
 	}
-	
-	
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
-		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
 		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
 	}
 
@@ -201,7 +146,7 @@ public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable<
 		
 		out.writeObject(reducer.getClass());
 		out.writeObject(combiner.getClass());
-		HadoopConfiguration.writeHadoopJobConf(jobConf, out);
+		jobConf.write(out);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index e3d6d24..1f0aedd 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -22,146 +22,100 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.Iterator;
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopConfiguration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 
 /**
- * The wrapper for a Hadoop Reducer (mapred API). Parses a Hadoop JobConf object and initialises all operations related
- * reducers and combiners.
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
  */
-public final class HadoopReduceFunction<KEYIN extends WritableComparable<?>, VALUEIN extends Writable,
-										KEYOUT extends WritableComparable<?>, VALUEOUT extends Writable> 
+@SuppressWarnings("rawtypes")
+public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable,
+										KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
 					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
 					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
 	private static final long serialVersionUID = 1L;
 
 	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient JobConf jobConf;
 	
-	private transient Class<KEYOUT> keyOutClass;
-	private transient Class<VALUEOUT> valOutClass;
-	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
 	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
 	private transient Reporter reporter;
-	private transient ReducerTransformingIterator iterator;
-
-	private JobConf jobConf;
-
-	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass) {
-		this(hadoopReducer, keyOutClass, valOutClass, new JobConf());
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
+		this(hadoopReducer, new JobConf());
 	}
 	
-	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
-								Class<KEYOUT> keyOutClass, Class<VALUEOUT> valOutClass, JobConf conf) {
-		this.reducer = hadoopReducer;
-
-		this.keyOutClass = keyOutClass;
-		this.valOutClass = valOutClass;
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
 		
-		this.jobConf = new JobConf();
+		this.reducer = hadoopReducer;
+		this.jobConf = conf;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
+		this.reducer.configure(jobConf);
 		
 		this.reporter = new HadoopDummyReporter();
 		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
-		this.iterator = new ReducerTransformingIterator();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
 	}
 
-	/**
-	 * A wrapping iterator for an iterator of key-value pairs that can be used as an iterator of values.
-	 */
-	private final class ReducerTransformingIterator extends TupleUnwrappingIterator<VALUEIN,KEYIN>
-			implements java.io.Serializable {
-
-		private static final long serialVersionUID = 1L;
-		private Iterator<Tuple2<KEYIN,VALUEIN>> iterator;
-		private KEYIN key;
-		private Tuple2<KEYIN,VALUEIN> first;
-
-		/**
-		 * Set the iterator to wrap.
-		 * @param iterator iterator to wrap
-		 */
-		@Override()
-		public void set(final Iterator<Tuple2<KEYIN,VALUEIN>> iterator) {
-			this.iterator = iterator;
-			if(this.hasNext()) {
-				this.first = iterator.next();
-				this.key = this.first.f0;
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			if(this.first != null) {
-				return true;
-			}
-			return iterator.hasNext();
-		}
-
-		@Override
-		public VALUEIN next() {
-			if(this.first != null) {
-				final VALUEIN val = this.first.f1;
-				this.first = null;
-				return val;
-			}
-			final Tuple2<KEYIN,VALUEIN> tuple = iterator.next();
-			return tuple.f1;
-		}
-
-		private KEYIN getKey() {
-			return WritableUtils.clone(this.key, jobConf);
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-	}
-
-	/**
-	 * Wrap a hadoop reduce() function call and use a Flink collector to collect the result values.
-	 * @param values The iterator returning the group of values to be reduced.
-	 * @param out The collector to emit the returned values.
-	 *
-	 * @throws Exception
-	 */
 	@Override
 	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
 			throws Exception {
+		
 		reduceCollector.setFlinkCollector(out);
-		iterator.set(values.iterator());
-		reducer.reduce(iterator.getKey(), iterator, reduceCollector, reporter);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(keyOutClass);
-		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(valOutClass);
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+		
+		final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass);
+		final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass);
 		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
 	}
 
@@ -172,8 +126,7 @@ public final class HadoopReduceFunction<KEYIN extends WritableComparable<?>, VAL
 	private void writeObject(final ObjectOutputStream out) throws IOException {
 		
 		out.writeObject(reducer.getClass());
-		HadoopConfiguration.writeHadoopJobConf(jobConf, out);
-		
+		jobConf.write(out);		
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index 23b6ae1..de20fab 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.mapred.TextOutputFormat;
  */
 public class HadoopMapredCompatWordCount {
 	
-	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public static void main(String[] args) throws Exception {
 		if (args.length < 2) {
 			System.err.println("Usage: WordCount <input path> <result path>");
@@ -70,8 +69,8 @@ public class HadoopMapredCompatWordCount {
 		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
 		
 		DataSet<Tuple2<Text, LongWritable>> words = 
-				text.flatMap(new HadoopMapFunction(new Tokenizer(), Text.class, LongWritable.class))
-					.groupBy(0).reduceGroup(new HadoopReduceCombineFunction(new Counter(), new Counter(), Text.class, LongWritable.class));
+				text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+					.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
 		
 		// Set up Hadoop Output Format
 		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
@@ -80,7 +79,7 @@ public class HadoopMapredCompatWordCount {
 		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
 		
 		// Output & Execute
-		words.output(hadoopOutputFormat);
+		words.output(hadoopOutputFormat).setParallelism(1);
 		env.execute("Hadoop Compat WordCount");
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopConfiguration.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopConfiguration.java
deleted file mode 100644
index 319e826..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopConfiguration.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.utils;
-
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.JobConf;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-/**
- * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
- */
-public class HadoopConfiguration {
-
-	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
-		for (Map.Entry<String, String> e : hadoopConf) {
-			jobConf.set(e.getKey(), e.getValue());
-		}
-	}
-
-	/**
-	 * Each task should gets its own jobConf object, when serializing.
-	 * @param jobConf the jobConf to write
-	 * @param out the outputstream to write to
-	 * @throws IOException
-	 */
-	public static void writeHadoopJobConf(final JobConf jobConf, final ObjectOutputStream out) throws IOException{
-		final JobConf clonedConf = WritableUtils.clone(jobConf, new Configuration());
-		clonedConf.write(out);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
index 0abbab3..137d741 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 
+
 public class HadoopUtils {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
index d7a69c3..483dd2f 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+
 package org.apache.flink.hadoopcompatibility.mapred.wrapper;
 
 import org.apache.hadoop.util.Progressable;
@@ -25,9 +26,8 @@ import org.apache.hadoop.util.Progressable;
  *
  */
 public class HadoopDummyProgressable implements Progressable {
-	
 	@Override
 	public void progress() { 
-		// Nothing reported here.
+		
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
index ee42475..280708f 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -27,16 +27,23 @@ import org.apache.hadoop.mapred.OutputCollector;
 import java.io.IOException;
 
 /**
- * A Hadoop OutputCollector that basically wraps a Flink OutputCollector.
- * This implies that on each call of collect() the data is actually collected.
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink collector.
+ * 
  */
-public final class HadoopOutputCollector<KEY extends WritableComparable<?>, VALUE extends Writable>
+@SuppressWarnings("rawtypes")
+public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable>
 		implements OutputCollector<KEY,VALUE> {
 
 	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
 	
 	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
 
+	/**
+	 * Set the wrapped Flink collector.
+	 * 
+	 * @param flinkCollector The wrapped Flink OutputCollector.
+	 */
 	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
 		this.flinkCollector = flinkCollector;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..83afe39
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
+ */
+@SuppressWarnings("rawtypes")
+public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable> 
+									extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private Iterator<Tuple2<KEY,VALUE>> iterator;
+	
+	private final WritableSerializer<KEY> keySerializer;
+	
+	private boolean atFirst = false;
+	private KEY curKey = null;
+	private VALUE firstValue = null;
+	
+	public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
+		this.keySerializer = new WritableSerializer<KEY>(keyClass);
+	}
+	
+	/**
+	* Set the Flink iterator to wrap.
+	* 
+	* @param iterator The Flink iterator to wrap.
+	*/
+	@Override()
+	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+		this.iterator = iterator;
+		if(this.hasNext()) {
+			final Tuple2<KEY, VALUE> tuple = iterator.next();
+			this.curKey = keySerializer.copy(tuple.f0);
+			this.firstValue = tuple.f1;
+			this.atFirst = true;
+		} else {
+			this.atFirst = false;
+		}
+	}
+	
+	@Override
+	public boolean hasNext() {
+		if(this.atFirst) {
+			return true;
+		}
+		return iterator.hasNext();
+	}
+	
+	@Override
+	public VALUE next() {
+		if(this.atFirst) {
+			this.atFirst = false;
+			return firstValue;
+		}
+		
+		final Tuple2<KEY, VALUE> tuple = iterator.next();
+		return tuple.f1;
+	}
+	
+	public KEY getCurrentKey() {
+		return this.curKey;
+	}
+	
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/HadoopTestBase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/HadoopTestBase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/HadoopTestBase.java
deleted file mode 100644
index 9504608..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/HadoopTestBase.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility;
-
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-
-public abstract class HadoopTestBase extends JavaProgramTestBase {
-
-	/**
-	 * Hadoop tests should not sort the result.
-	 */
-	public void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception {
-		ArrayList<String> list = new ArrayList<String>();
-		readAllResultLines(list, resultPath, false);
-
-		String[] result = list.toArray(new String[list.size()]);
-		String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n");
-		Arrays.sort(expected);
-
-		Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length);
-		Assert.assertArrayEquals(expected, result);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
new file mode 100644
index 0000000..6a9ad2c
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class HadoopMapFunctionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 3;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public HadoopMapFunctionITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = MapperProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class MapperProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test non-passing mapper
+				 */
+		
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
+				
+				nonPassingFlatMapDs.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"\n";
+			}
+			case 2: {
+				/*
+				 * Test data duplicating mapper
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
+				
+				duplicatingFlatMapDs.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(1,Hi)\n" + "(1,HI)\n" +
+				"(2,Hello)\n" + "(2,HELLO)\n" +
+				"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
+				"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
+				"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
+				"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
+				"(7,Comment#1)\n" + "(7,COMMENT#1)\n" + 
+				"(8,Comment#2)\n" + "(8,COMMENT#2)\n" + 
+				"(9,Comment#3)\n" + "(9,COMMENT#3)\n" + 
+				"(10,Comment#4)\n" + "(10,COMMENT#4)\n" + 
+				"(11,Comment#5)\n" + "(11,COMMENT#5)\n" + 
+				"(12,Comment#6)\n" + "(12,COMMENT#6)\n" + 
+				"(13,Comment#7)\n" + "(13,COMMENT#7)\n" + 
+				"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
+				"(15,Comment#9)\n" + "(15,COMMENT#9)\n" + 
+				"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
+				"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
+				"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
+				"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
+				"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
+				"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
+			}
+			case 3: {
+				// Mapper configured via JobConf
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				JobConf conf = new JobConf();
+				conf.set("my.filterPrefix", "Hello");
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+				DataSet<Tuple2<IntWritable, Text>> hellos = ds.
+						flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
+				
+				hellos.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(2,Hello)\n" +
+				"(3,Hello world)\n" +
+				"(4,Hello world, how are you?)\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			if ( v.toString().contains("bananas") ) {
+				out.collect(k,v);
+			}
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			out.collect(k, v);
+			out.collect(k, new Text(v.toString().toUpperCase()));
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		private String filterPrefix;
+		
+		@Override
+		public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
+				throws IOException {
+			if(v.toString().startsWith(filterPrefix)) {
+				out.collect(k, v);
+			}
+		}
+		
+		@Override
+		public void configure(JobConf c) {
+			filterPrefix = c.get("my.filterPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index 01bfe09..f8c3bab 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -31,7 +31,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
-		this.setDegreeOfParallelism(1);
+		this.setDegreeOfParallelism(4);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
new file mode 100644
index 0000000..e2c124d
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceCombineFunctionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 4;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public HadoopReduceCombineFunctionITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Override
+	protected boolean skipCollectionExecution() {
+		if (this.curProgId == 3) {
+			return true;
+		}
+		return false;
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class ReducerProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test standard counting with combiner
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = new IntWritable(v.f0.get() / 6);
+								outT.f1 = new IntWritable(1);
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new SumReducer()));
+				
+				counts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,5)\n"+
+						"(1,6)\n" +
+						"(2,6)\n" +
+						"(3,4)\n";
+			}
+			case 2: {
+				/*
+				 * Test ungrouped Hadoop reducer
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = new IntWritable(0);
+								outT.f1 = v.f0;
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new SumReducer()));
+				
+				sum.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,231)\n";
+			}
+			case 3: {
+				/* Test combiner */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>>() {
+							private static final long serialVersionUID = 1L;
+							Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+							@Override
+							public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								outT.f0 = v.f0;
+								outT.f1 = new IntWritable(1);
+								return outT;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+								new SumReducer(), new KeyChangingReducer()));
+				
+				counts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,5)\n"+
+						"(1,6)\n" +
+						"(2,5)\n" +
+						"(3,5)\n";
+			}
+			case 4: {
+				/*
+				 * Test configuration via JobConf
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				JobConf conf = new JobConf();
+				conf.set("my.cntPrefix", "Hello");
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
+							private static final long serialVersionUID = 1L;
+							@Override
+							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								v.f0 = new IntWritable(v.f0.get() % 5);
+								return v;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+								new ConfigurableCntReducer(), conf));
+				
+				hellos.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,0)\n"+
+						"(1,0)\n" +
+						"(2,1)\n" +
+						"(3,1)\n" +
+						"(4,1)\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			
+			int sum = 0;
+			while(v.hasNext()) {
+				sum += v.next().get();
+			}
+			out.collect(k, new IntWritable(sum));
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			while(v.hasNext()) {
+				out.collect(new IntWritable(k.get() % 4), v.next());
+			}
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		private String countPrefix;
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith(this.countPrefix)) {
+					commentCnt++;
+				}
+			}
+			out.collect(k, new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf c) { 
+			this.countPrefix = c.get("my.cntPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
new file mode 100644
index 0000000..605bf21
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceFunctionITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 3;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public HadoopReduceFunctionITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class ReducerProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/*
+				 * Test standard grouping
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
+							private static final long serialVersionUID = 1L;
+							@Override
+							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								v.f0 = new IntWritable(v.f0.get() / 5);
+								return v;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer()));
+				
+				commentCnts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,0)\n"+
+						"(1,3)\n" +
+						"(2,5)\n" +
+						"(3,5)\n" +
+						"(4,2)\n";
+			}
+			case 2: {
+				/*
+				 * Test ungrouped Hadoop reducer
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
+						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer()));
+				
+				commentCnts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(42,15)\n";
+			}
+			case 3: {
+				/*
+				 * Test configuration via JobConf
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				JobConf conf = new JobConf();
+				conf.set("my.cntPrefix", "Hello");
+				
+				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
+						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>() {
+							private static final long serialVersionUID = 1L;
+							@Override
+							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
+									throws Exception {
+								v.f0 = new IntWritable(v.f0.get() % 5);
+								return v;
+							}
+						});
+						
+				DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
+						groupBy(0).
+						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+								new ConfigurableCntReducer(), conf));
+				
+				helloCnts.writeAsText(resultPath);
+				env.execute();
+				
+				// return expected result
+				return 	"(0,0)\n"+
+						"(1,0)\n" +
+						"(2,1)\n" +
+						"(3,1)\n" +
+						"(4,1)\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+	
+	public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith("Comment")) {
+					commentCnt++;
+				}
+			}
+			out.collect(k, new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith("Comment")) {
+					commentCnt++;
+				}
+			}
+			out.collect(new IntWritable(42), new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		private String countPrefix;
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith(this.countPrefix)) {
+					commentCnt++;
+				}
+			}
+			out.collect(k, new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf c) { 
+			this.countPrefix = c.get("my.cntPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
new file mode 100644
index 0000000..eed6f8f
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+
+public class HadoopTestData {
+
+	public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) {
+		
+		List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>();
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
+		data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
+		
+		Collections.shuffle(data);
+		
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
new file mode 100644
index 0000000..2592b88
--- /dev/null
+++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
+
+import java.util.ArrayList;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HadoopTupleUnwrappingIteratorTest {
+
+	@Test
+	public void testValueIterator() {
+		
+		HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
+				new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class);
+		
+		// many values
+		
+		ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>();
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
+		
+		int expectedKey = 1;
+		int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
+		
+		valIt.set(tList.iterator());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		for(int expectedValue : expectedValues) {
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.next().get() == expectedValue);
+			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		}
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		
+		// one value
+		
+		tList.clear();
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
+		
+		expectedKey = 2;
+		expectedValues = new int[]{10};
+		
+		valIt.set(tList.iterator());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		for(int expectedValue : expectedValues) {
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.next().get() == expectedValue);
+			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		}
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		
+		// more values
+		
+		tList.clear();
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
+		
+		expectedKey = 3;
+		expectedValues = new int[]{10,4,7,9,21};
+		
+		valIt.set(tList.iterator());
+		Assert.assertTrue(valIt.hasNext());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		for(int expectedValue : expectedValues) {
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.hasNext());
+			Assert.assertTrue(valIt.next().get() == expectedValue);
+			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		}
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		
+		// no has next calls
+		
+		tList.clear();
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
+		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
+		
+		expectedKey = 4;
+		expectedValues = new int[]{5,8,42,-1,0};
+		
+		valIt.set(tList.iterator());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+		for(int expectedValue : expectedValues) {
+			Assert.assertTrue(valIt.next().get() == expectedValue);
+		}
+		try {
+			valIt.next();
+			Assert.fail();
+		} catch (NoSuchElementException nsee) {
+			// expected
+		}
+		Assert.assertFalse(valIt.hasNext());
+		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/aa6d1cf5/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 61e10d4..f75697b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -87,6 +87,9 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	protected void postSubmit() throws Exception {}
 	
+	protected boolean skipCollectionExecution() {
+		return false;
+	};
 
 	// --------------------------------------------------------------------------------------------
 	//  Test entry point
@@ -141,6 +144,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	@Test
 	public void testJobCollectionExecution() throws Exception {
+		
+		// check if collection execution should be skipped.
+		if(this.skipCollectionExecution()) {
+			return;
+		}
+		
 		isCollectionExecution = true;
 		
 		// pre-submit


Mime
View raw message