flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [23/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:01 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
new file mode 100644
index 0000000..2d2f518
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+
+
+public class HadoopUtils {
+	
+	/**
+	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
+	 */
+	public static void mergeHadoopConf(JobConf jobConf) {
+		org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		for (Map.Entry<String, String> e : hadoopConf) {
+			jobConf.set(e.getKey(), e.getValue());
+		}
+	}
+	
+	public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
+		try {
+			// for Hadoop 1.xx
+			Class<?> clazz = null;
+			if(!TaskAttemptContext.class.isInterface()) { 
+				clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
+			// for Hadoop 1.xx
+			constructor.setAccessible(true);
+			JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of JobContext.", e);
+		}
+	}
+	
+	public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf,  TaskAttemptID taskAttemptID) throws Exception {
+		try {
+			// for Hadoop 1.xx
+			Class<?> clazz = null;
+			if(!TaskAttemptContext.class.isInterface()) { 
+				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
+			// for Hadoop 1.xx
+			constructor.setAccessible(true);
+			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of TaskAttemptContext.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
new file mode 100644
index 0000000..483dd2f
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This is a dummy progress
+ *
+ */
+public class HadoopDummyProgressable implements Progressable {
+	@Override
+	public void progress() { 
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
new file mode 100644
index 0000000..84a1e9e
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This is a dummy progress monitor / reporter
+ *
+ */
+public class HadoopDummyReporter implements Reporter {
+
+	@Override
+	public void progress() {
+	}
+
+	@Override
+	public void setStatus(String status) {
+
+	}
+
+	@Override
+	public Counter getCounter(Enum<?> name) {
+		return null;
+	}
+
+	@Override
+	public Counter getCounter(String group, String name) {
+		return null;
+	}
+
+	@Override
+	public void incrCounter(Enum<?> key, long amount) {
+
+	}
+
+	@Override
+	public void incrCounter(String group, String counter, long amount) {
+
+	}
+
+	@Override
+	public InputSplit getInputSplit() throws UnsupportedOperationException {
+		return null;
+	}
+	// There should be an @Override, but some CDH4 dependency does not contain this method
+	public float getProgress() {
+		return 0;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..aa2155d
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapred.JobConf;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+
+	private static final long serialVersionUID = 1L;
+
+	
+	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+	
+	private JobConf jobConf;
+	
+	private int splitNumber;
+	private String hadoopInputSplitTypeName;
+
+
+	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+		return hadoopInputSplit;
+	}
+
+	public HadoopInputSplit() {
+		super();
+	}
+
+	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+
+		this.splitNumber = splitNumber;
+		this.hadoopInputSplit = hInputSplit;
+		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
+		this.jobConf = jobconf;
+
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(splitNumber);
+		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
+		hadoopInputSplit.write(out);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber = in.readInt();
+		this.hadoopInputSplitTypeName = in.readUTF();
+		if(hadoopInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
+		this.hadoopInputSplit.readFields(in);
+
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(splitNumber);
+		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
+		hadoopInputSplit.write(out);
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		this.splitNumber=in.readInt();
+		this.hadoopInputSplitTypeName = in.readUTF();
+		if(hadoopInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
+		this.hadoopInputSplit.readFields(in);
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+
+	@Override
+	public String[] getHostnames() {
+		try {
+			return this.hadoopInputSplit.getLocations();
+		} catch(IOException ioe) {
+			return new String[0];
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
new file mode 100644
index 0000000..fcb6841
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import java.io.IOException;
+
+/**
+ * A Hadoop OutputCollector that wraps a Flink OutputCollector.
+ * On each call of collect() the data is forwarded to the wrapped Flink collector.
+ * 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopOutputCollector<KEY,VALUE>
+		implements OutputCollector<KEY,VALUE> {
+
+	private Collector<Tuple2<KEY,VALUE>> flinkCollector;
+	
+	private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>();
+
+	/**
+	 * Set the wrapped Flink collector.
+	 * 
+	 * @param flinkCollector The wrapped Flink OutputCollector.
+	 */
+	public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) {
+		this.flinkCollector = flinkCollector;
+	}
+	
+	/**
+	 * Use the wrapped Flink collector to collect a key-value pair for Flink. 
+	 * 
+	 * @param key the key to collect
+	 * @param val the value to collect
+	 * @throws IOException unexpected of key or value in key-value pair.
+	 */
+	@Override
+	public void collect(final KEY key, final VALUE val) throws IOException {
+
+		this.outTuple.f0 = key;
+		this.outTuple.f1 = val;
+		this.flinkCollector.collect(outTuple);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
new file mode 100644
index 0000000..5ecac2e
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.wrapper;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
+ */
+@SuppressWarnings("rawtypes")
+public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
+									extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private Iterator<Tuple2<KEY,VALUE>> iterator;
+	
+	private final TypeSerializer<KEY> keySerializer;
+	
+	private boolean atFirst = false;
+	private KEY curKey = null;
+	private VALUE firstValue = null;
+	
+	public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
+		this.keySerializer = TypeExtractor.getForClass((Class<KEY>) keyClass).createSerializer();
+	}
+	
+	/**
+	* Set the Flink iterator to wrap.
+	* 
+	* @param iterator The Flink iterator to wrap.
+	*/
+	@Override()
+	public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) {
+		this.iterator = iterator;
+		if(this.hasNext()) {
+			final Tuple2<KEY, VALUE> tuple = iterator.next();
+			this.curKey = keySerializer.copy(tuple.f0);
+			this.firstValue = tuple.f1;
+			this.atFirst = true;
+		} else {
+			this.atFirst = false;
+		}
+	}
+	
+	@Override
+	public boolean hasNext() {
+		if(this.atFirst) {
+			return true;
+		}
+		return iterator.hasNext();
+	}
+	
+	@Override
+	public VALUE next() {
+		if(this.atFirst) {
+			this.atFirst = false;
+			return firstValue;
+		}
+		
+		final Tuple2<KEY, VALUE> tuple = iterator.next();
+		return tuple.f1;
+	}
+	
+	public KEY getCurrentKey() {
+		return this.curKey;
+	}
+	
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
new file mode 100644
index 0000000..20006b8
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class);
+
+	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
+	private Class<K> keyClass;
+	private Class<V> valueClass;
+	private org.apache.hadoop.conf.Configuration configuration;
+
+	private transient RecordReader<K, V> recordReader;
+	private boolean fetched = false;
+	private boolean hasNext;
+
+	public HadoopInputFormat() {
+		super();
+	}
+
+	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+		super();
+		this.mapreduceInputFormat = mapreduceInputFormat;
+		this.keyClass = key;
+		this.valueClass = value;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+
+	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+		this.configuration = configuration;
+	}
+
+	public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
+		return this.mapreduceInputFormat;
+	}
+
+	public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
+		this.mapreduceInputFormat = mapreduceInputFormat;
+	}
+
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  InputFormat
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
+		// only gather base statistics for FileInputFormats
+		if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+			return null;
+		}
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
+				(FileBaseStatistics) cachedStats : null;
+				
+				try {
+					final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
+					return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
+				} catch (IOException ioex) {
+					if (LOG.isWarnEnabled()) {
+						LOG.warn("Could not determine statistics due to an io error: " 
+								+ ioex.getMessage());
+					}
+				} catch (Throwable t) {
+					if (LOG.isErrorEnabled()) {
+						LOG.error("Unexpected problem while getting the file statistics: "
+								+ t.getMessage(), t);
+					}
+				}
+				
+				// no statistics available
+				return null;
+	}
+
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
+
+		JobContext jobContext = null;
+		try {
+			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		List<org.apache.hadoop.mapreduce.InputSplit> splits;
+		try {
+			splits = this.mapreduceInputFormat.getSplits(jobContext);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get Splits.", e);
+		}
+		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
+
+		for(int i = 0; i < hadoopInputSplits.length; i++){
+			hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext);
+		}
+		return hadoopInputSplits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new LocatableInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		TaskAttemptContext context = null;
+		try {
+			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
+		} catch(Exception e) {
+			throw new RuntimeException(e);
+		}
+
+		try {
+			this.recordReader = this.mapreduceInputFormat
+					.createRecordReader(split.getHadoopInputSplit(), context);
+			this.recordReader.initialize(split.getHadoopInputSplit(), context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordReader.", e);
+		} finally {
+			this.fetched = false;
+		}
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		return !this.hasNext;
+	}
+
+	private void fetchNext() throws IOException {
+		try {
+			this.hasNext = this.recordReader.nextKeyValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not fetch next KeyValue pair.", e);
+		} finally {
+			this.fetched = true;
+		}
+	}
+
+	@Override
+	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
+		if(!this.fetched) {
+			fetchNext();
+		}
+		if(!this.hasNext) {
+			return null;
+		}
+		try {
+			record.f0 = this.recordReader.getCurrentKey();
+			record.f1 = this.recordReader.getCurrentValue();
+		} catch (InterruptedException e) {
+			throw new IOException("Could not get KeyValue pair.", e);
+		}
+		this.fetched = false;
+
+		return record;
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, 
+											ArrayList<FileStatus> files) throws IOException {
+
+		long latestModTime = 0L;
+
+		// get the file info and check whether the cached statistics are still valid.
+		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+
+			final Path filePath = new Path(hadoopPath.toUri());
+			final FileSystem fs = FileSystem.get(filePath.toUri());
+
+			final FileStatus file = fs.getFileStatus(filePath);
+			latestModTime = Math.max(latestModTime, file.getModificationTime());
+
+			// enumerate all files and check their modification time stamp.
+			if (file.isDir()) {
+				FileStatus[] fss = fs.listStatus(filePath);
+				files.ensureCapacity(files.size() + fss.length);
+
+				for (FileStatus s : fss) {
+					if (!s.isDir()) {
+						files.add(s);
+						latestModTime = Math.max(s.getModificationTime(), latestModTime);
+					}
+				}
+			} else {
+				files.add(file);
+			}
+		}
+
+		// check whether the cached statistics are still valid, if we have any
+		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
+			return cachedStats;
+		}
+
+		// calculate the whole length
+		long len = 0;
+		for (FileStatus s : files) {
+			len += s.getLen();
+		}
+
+		// sanity check
+		if (len <= 0) {
+			len = BaseStatistics.SIZE_UNKNOWN;
+		}
+
+		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
+		out.writeUTF(this.keyClass.getName());
+		out.writeUTF(this.valueClass.getName());
+		this.configuration.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopInputFormatClassName = in.readUTF();
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+
+		try {
+			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		try {
+			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find key class.", e);
+		}
+		try {
+			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to find value class.", e);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  ResultTypeQueryable
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
new file mode 100644
index 0000000..696e1be
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+
+public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster {
+	
+	private static final long serialVersionUID = 1L;
+	
+	private org.apache.hadoop.conf.Configuration configuration;
+	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
+	private transient RecordWriter<K,V> recordWriter;
+	private transient FileOutputCommitter fileOutputCommitter;
+	private transient TaskAttemptContext context;
+	private transient int taskNumber;
+	
+	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
+		super();
+		this.mapreduceOutputFormat = mapreduceOutputFormat;
+		this.configuration = job.getConfiguration();
+		HadoopUtils.mergeHadoopConf(configuration);
+	}
+	
+	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
+		this.configuration = configuration;
+	}
+	
+	public org.apache.hadoop.conf.Configuration getConfiguration() {
+		return this.configuration;
+	}
+	
+	public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
+		return this.mapreduceOutputFormat;
+	}
+	
+	public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
+		this.mapreduceOutputFormat = mapreduceOutputFormat;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  OutputFormat
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void configure(Configuration parameters) {
+		// nothing to do
+	}
+	
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		if (Integer.toString(taskNumber + 1).length() > 6) {
+			throw new IOException("Task id too large.");
+		}
+		
+		this.taskNumber = taskNumber+1;
+		
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.output.basename", "tmp");
+		
+		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
+				+ Integer.toString(taskNumber + 1) 
+				+ "_0");
+		
+		this.configuration.set("mapred.task.id", taskAttemptID.toString());
+		this.configuration.setInt("mapred.task.partition", taskNumber + 1);
+		// for hadoop 2.2
+		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
+		this.configuration.setInt("mapreduce.task.partition", taskNumber + 1);
+		
+		try {
+			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
+		
+		try {
+			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
+		
+		try {
+			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not create RecordWriter.", e);
+		}
+	}
+	
+	
+	@Override
+	public void writeRecord(Tuple2<K, V> record) throws IOException {
+		try {
+			this.recordWriter.write(record.f0, record.f1);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not write Record.", e);
+		}
+	}
+	
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		try {
+			this.recordWriter.close(this.context);
+		} catch (InterruptedException e) {
+			throw new IOException("Could not close RecordReader.", e);
+		}
+		
+		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
+			this.fileOutputCommitter.commitTask(this.context);
+		}
+		
+		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+		
+		// rename tmp-file to final name
+		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
+		
+		String taskNumberStr = Integer.toString(this.taskNumber);
+		String tmpFileTemplate = "tmp-r-00000";
+		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
+		
+		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+		}
+	}
+	
+	@Override
+	public void finalizeGlobal(int parallelism) throws IOException {
+
+		JobContext jobContext;
+		TaskAttemptContext taskContext;
+		try {
+			
+			TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
+					+ String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") 
+					+ Integer.toString(1) 
+					+ "_0");
+			
+			jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID());
+			taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
+		
+		// finalize HDFS output format
+		this.fileOutputCommitter.commitJob(jobContext);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom serialization methods
+	// --------------------------------------------------------------------------------------------
+	
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
+		this.configuration.write(out);
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		String hadoopOutputFormatClassName = in.readUTF();
+		
+		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
+		configuration.readFields(in);
+		
+		if(this.configuration == null) {
+			this.configuration = configuration;
+		}
+		
+		try {
+			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
new file mode 100644
index 0000000..2b99fd2
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.example;
+
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+@SuppressWarnings("serial")
+public class WordCount {
+	
+	public static void main(String[] args) throws Exception {
+		if (args.length < 2) {
+			System.err.println("Usage: WordCount <input path> <result path>");
+			return;
+		}
+		
+		final String inputPath = args[0];
+		final String outputPath = args[1];
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// Set up the Hadoop Input Format
+		Job job = Job.getInstance();
+		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
+		TextInputFormat.addInputPath(job, new Path(inputPath));
+		
+		// Create a Flink job with it
+		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+		
+		// Tokenize the line and convert from Writable "Text" to String for better handling
+		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
+		
+		// Sum up the words
+		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
+		
+		// Convert String back to Writable "Text" for use with Hadoop Output Format
+		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
+		
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
+		hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
+		hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
+		// is being executed with both types (hadoop1 and hadoop2 profile)
+		TextOutputFormat.setOutputPath(job, new Path(outputPath));
+		
+		// Output & Execute
+		hadoopResult.output(hadoopOutputFormat);
+		env.execute("Word Count");
+	}
+	
+	/**
+	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
+	 */
+	public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
+		
+		@Override
+		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String line = value.f1.toString();
+			String[] tokens = line.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	/**
+	 * Converts Java data types to Hadoop Writables.
+	 */
+	public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
+		
+		@Override
+		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
+			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
+		}
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
new file mode 100644
index 0000000..86b730f
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapreduce.utils;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HadoopUtils {
+	
+	/**
+	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
+	 */
+	public static void mergeHadoopConf(Configuration configuration) {
+		Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
+		
+		for (Map.Entry<String, String> e : hadoopConf) {
+			configuration.set(e.getKey(), e.getValue());
+		}
+	}
+	
+	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
+			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of JobContext.");
+		}
+	}
+	
+	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
+		try {
+			Class<?> clazz;
+			// for Hadoop 1.xx
+			if(JobContext.class.isInterface()) {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+			}
+			// for Hadoop 2.xx
+			else {
+				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
+			}
+			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
+			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
+			
+			return context;
+		} catch(Exception e) {
+			throw new Exception("Could not create instance of TaskAttemptContext.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
new file mode 100644
index 0000000..7477c28
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.mapreduce.JobContext;
+
+
+public class HadoopInputSplit extends LocatableInputSplit {
+	
+	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
+	public transient JobContext jobContext;
+	
+	private int splitNumber;
+	
+	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+		return mapreduceInputSplit;
+	}
+	
+	
+	public HadoopInputSplit() {
+		super();
+	}
+	
+	
+	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
+		this.splitNumber = splitNumber;
+		if(!(mapreduceInputSplit instanceof Writable)) {
+			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
+		}
+		this.mapreduceInputSplit = mapreduceInputSplit;
+		this.jobContext = jobContext;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(this.splitNumber);
+		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+		Writable w = (Writable) this.mapreduceInputSplit;
+		w.write(out);
+	}
+	
+	@Override
+	public void read(DataInputView in) throws IOException {
+		this.splitNumber = in.readInt();
+		String className = in.readUTF();
+		
+		if(this.mapreduceInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
+						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+			} catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		((Writable)this.mapreduceInputSplit).readFields(in);
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeInt(this.splitNumber);
+		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
+		Writable w = (Writable) this.mapreduceInputSplit;
+		w.write(out);
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		this.splitNumber=in.readInt();
+		String className = in.readUTF();
+
+		if(this.mapreduceInputSplit == null) {
+			try {
+				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
+						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
+				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
+			} catch (Exception e) {
+				throw new RuntimeException("Unable to create InputSplit", e);
+			}
+		}
+		((Writable)this.mapreduceInputSplit).readFields(in);
+	}
+	
+	@Override
+	public int getSplitNumber() {
+		return this.splitNumber;
+	}
+	
+	@Override
+	public String[] getHostnames() {
+		try {
+			return this.mapreduceInputSplit.getLocations();
+		} catch (IOException e) {
+			return new String[0];
+		} catch (InterruptedException e) {
+			return new String[0];
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
new file mode 100644
index 0000000..32396b8
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class HadoopIOFormatsITCase extends JavaProgramTestBase {
+
+	private static int NUM_PROGRAMS = 2;
+
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String[] resultPath;
+	private String[] expectedResult;
+	private String sequenceFileInPath;
+	private String sequenceFileInPathNull;
+
+	public HadoopIOFormatsITCase(Configuration config) {
+		super(config);	
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") };
+
+		File sequenceFile = createAndRegisterTempFile("seqFile");
+		sequenceFileInPath = sequenceFile.toURI().toString();
+
+		// Create a sequence file
+		org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+		FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf);
+		Path path = new Path(sequenceFile.getAbsolutePath());
+
+		//  ------------------ Long / Text Key Value pair: ------------
+		int kvCount = 4;
+
+		LongWritable key = new LongWritable();
+		Text value = new Text();
+		SequenceFile.Writer writer = null;
+		try {
+			writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				if(i == 1) {
+					// write key = 0 a bit more often.
+					for(int a = 0;a < 15; a++) {
+						key.set(i);
+						value.set(i+" - somestring");
+						writer.append(key, value);
+					}
+				}
+				key.set(i);
+				value.set(i+" - somestring");
+				writer.append(key, value);
+			}
+		} finally {
+			IOUtils.closeStream(writer);
+		}
+
+
+		//  ------------------ Long / Text Key Value pair: ------------
+
+		File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey");
+		sequenceFileInPathNull = sequenceFileNull.toURI().toString();
+		path = new Path(sequenceFileInPathNull);
+
+		LongWritable value1 = new LongWritable();
+		SequenceFile.Writer writer1 = null;
+		try {
+			writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass());
+			for (int i = 0; i < kvCount; i ++) {
+				value1.set(i);
+				writer1.append(NullWritable.get(), value1);
+			}
+		} finally {
+			IOUtils.closeStream(writer1);
+		}
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		for(int i = 0; i < resultPath.length; i++) {
+			compareResultsByLinesInMemory(expectedResult[i], resultPath[i]);
+		}
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	public static class HadoopIOFormatPrograms {
+		
+		public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				/**
+				 * Test sequence file, including a key access.
+				 */
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath));
+				HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf);
+				DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif);
+				DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() {
+					@Override
+					public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception {
+						return new Tuple2<Long, Text>(value.f0.get(), value.f1);
+					}
+				}).sum(0);
+				sumed.writeAsText(resultPath[0]);
+				DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
+					@Override
+					public String map(Tuple2<LongWritable, Text> value) throws Exception {
+						return value.f1 + " - " + value.f0.get();
+					}
+				});
+				res.writeAsText(resultPath[1]);
+				env.execute();
+				
+				// return expected result
+				return 	new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" +
+						"1 - somestring - 1\n" +
+						"2 - somestring - 2\n" +
+						"3 - somestring - 3\n"};
+
+			}
+			case 2: {
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+				SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>();
+				JobConf hdconf = new JobConf();
+				SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull));
+				HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf);
+				DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif);
+				DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() {
+					@Override
+					public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception {
+						return new Tuple2<Void, Long>(null, value.f1.get());
+					}
+				});
+				DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1);
+				res1.writeAsText(resultPath[1]);
+				res.writeAsText(resultPath[0]);
+				env.execute();
+
+				// return expected result
+				return 	new String [] {"(null,2)\n" +
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,3)",
+						"(null,0)\n" +
+						"(null,1)\n" +
+						"(null,2)\n" +
+						"(null,3)"};
+			}
+			default:
+				throw new IllegalArgumentException("Invalid program id");
+			}
+			
+		}
+	
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..00fd1f9
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class HadoopInputFormatTest {
+
+
+	public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
+
+		public DummyVoidKeyInputFormat() {
+		}
+
+		@Override
+		public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+			return null;
+		}
+	}
+	
+	
+	@Test
+	public void checkTypeInformation() {
+		try {
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+			// Set up the Hadoop Input Format
+			Job job = Job.getInstance();
+			HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf());
+
+			TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
+			TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+			
+			if(tupleType.isTupleType()) {
+				if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+					fail("Tuple type information was not set correctly!");
+				}
+			} else {
+				fail("Type information was not set to tuple type information!");
+			}
+
+		}
+		catch (Exception ex) {
+			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
+		}
+
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
new file mode 100644
index 0000000..850e799
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
+
+	public HadoopMapFunctionITCase(ExecutionMode mode){
+		super(mode);
+	}
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testNonPassingMapper() throws Exception{
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+		DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
+				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		env.execute();
+
+		compareResultsByLinesInMemory("\n", resultPath);
+	}
+
+	@Test
+	public void testDataDuplicatingMapper() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+		DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
+				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		env.execute();
+
+		String expected = "(1,Hi)\n" + "(1,HI)\n" +
+				"(2,Hello)\n" + "(2,HELLO)\n" +
+				"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
+				"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
+				"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
+				"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
+				"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
+				"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
+				"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
+				"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
+				"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
+				"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
+				"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
+				"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
+				"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
+				"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
+				"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
+				"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
+				"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
+				"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
+				"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testConfigurableMapper() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		JobConf conf = new JobConf();
+		conf.set("my.filterPrefix", "Hello");
+
+		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
+		DataSet<Tuple2<IntWritable, Text>> hellos = ds.
+				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		env.execute();
+
+		String expected = "(2,Hello)\n" +
+				"(3,Hello world)\n" +
+				"(4,Hello world, how are you?)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+	
+
+	
+	public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			if ( v.toString().contains("bananas") ) {
+				out.collect(k,v);
+			}
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		
+		@Override
+		public void map(final IntWritable k, final Text v, 
+				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
+			out.collect(k, v);
+			out.collect(k, new Text(v.toString().toUpperCase()));
+		}
+		
+		@Override
+		public void configure(final JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
+		private String filterPrefix;
+		
+		@Override
+		public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
+				throws IOException {
+			if(v.toString().startsWith(filterPrefix)) {
+				out.collect(k, v);
+			}
+		}
+		
+		@Override
+		public void configure(JobConf c) {
+			filterPrefix = c.get("my.filterPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
new file mode 100644
index 0000000..b6650d2
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class HadoopMapredITCase extends JavaProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+		this.setDegreeOfParallelism(4);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
+	}
+	
+	@Override
+	protected void testProgram() throws Exception {
+		HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
new file mode 100644
index 0000000..92b0dc3
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.hamcrest.core.IsEqual;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {
+
+	public HadoopReduceCombineFunctionITCase(ExecutionMode mode){
+		super(mode);
+	}
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testStandardCountingWithCombiner() throws Exception{
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+				map(new Mapper1());
+
+		DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+				groupBy(0).
+				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+						new SumReducer(), new SumReducer()));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		counts.writeAsText(resultPath);
+		env.execute();
+
+		String expected = "(0,5)\n"+
+				"(1,6)\n" +
+				"(2,6)\n" +
+				"(3,4)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testUngroupedHadoopReducer() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+				map(new Mapper2());
+
+		DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
+				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+						new SumReducer(), new SumReducer()));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		sum.writeAsText(resultPath);
+		env.execute();
+
+		String expected = "(0,231)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testCombiner() throws Exception {
+		org.junit.Assume.assumeThat(mode, new IsEqual<ExecutionMode>(ExecutionMode.CLUSTER));
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
+				map(new Mapper3());
+
+		DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
+				groupBy(0).
+				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
+						new SumReducer(), new KeyChangingReducer()));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		counts.writeAsText(resultPath);
+		env.execute();
+
+		String expected = "(0,5)\n"+
+				"(1,6)\n" +
+				"(2,5)\n" +
+				"(3,5)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testConfigurationViaJobConf() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		JobConf conf = new JobConf();
+		conf.set("my.cntPrefix", "Hello");
+
+		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
+				map(new Mapper4());
+
+		DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
+				groupBy(0).
+				reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
+						new ConfigurableCntReducer(), conf));
+
+		String resultPath = tempFolder.newFile().toURI().toString();
+
+		hellos.writeAsText(resultPath);
+		env.execute();
+
+		// return expected result
+		String expected = "(0,0)\n"+
+				"(1,0)\n" +
+				"(2,1)\n" +
+				"(3,1)\n" +
+				"(4,1)\n";
+
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+	
+	public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			
+			int sum = 0;
+			while(v.hasNext()) {
+				sum += v.next().get();
+			}
+			out.collect(k, new IntWritable(sum));
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
+
+		@Override
+		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			while(v.hasNext()) {
+				out.collect(new IntWritable(k.get() % 4), v.next());
+			}
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+
+		@Override
+		public void close() throws IOException { }
+	}
+	
+	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
+		private String countPrefix;
+		
+		@Override
+		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
+				throws IOException {
+			int commentCnt = 0;
+			while(vs.hasNext()) {
+				String v = vs.next().toString();
+				if(v.startsWith(this.countPrefix)) {
+					commentCnt++;
+				}
+			}
+			out.collect(k, new IntWritable(commentCnt));
+		}
+		
+		@Override
+		public void configure(final JobConf c) { 
+			this.countPrefix = c.get("my.cntPrefix");
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+
+	public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
+			IntWritable>> {
+		private static final long serialVersionUID = 1L;
+		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		@Override
+		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+		throws Exception {
+			outT.f0 = new IntWritable(v.f0.get() / 6);
+			outT.f1 = new IntWritable(1);
+			return outT;
+		}
+	}
+
+	public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
+			IntWritable>> {
+		private static final long serialVersionUID = 1L;
+		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		@Override
+		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+		throws Exception {
+			outT.f0 = new IntWritable(0);
+			outT.f1 = v.f0;
+			return outT;
+		}
+	}
+
+	public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
+		private static final long serialVersionUID = 1L;
+		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
+		@Override
+		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
+		throws Exception {
+			outT.f0 = v.f0;
+			outT.f1 = new IntWritable(1);
+			return outT;
+		}
+	}
+
+	public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
+		throws Exception {
+			v.f0 = new IntWritable(v.f0.get() % 5);
+			return v;
+		}
+	}
+}


Mime
View raw message