flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [24/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:02 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
new file mode 100644
index 0000000..aa9f048
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
+ */
+@SuppressWarnings("rawtypes")
+@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
+	private transient Reporter reporter;
+
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+										Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
+		this(hadoopReducer, hadoopCombiner, new JobConf());
+	}
+	
+	/**
+	 * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction.
+	 * 
+	 * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction.
+	 * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function.
+	 * @param conf The JobConf that is used to configure both Hadoop Reducers.
+	 */
+	public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer,
+								Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(hadoopCombiner == null) {
+			throw new NullPointerException("Combiner may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.combiner = hadoopCombiner;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		this.combiner.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
+		this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>();
+		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@Override
+	public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
+		combineCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		out.writeObject(combiner.getClass());
+		jobConf.write(out);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
+		combiner = InstantiationUtil.instantiate(combinerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
new file mode 100644
index 0000000..d9797c3
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. 
+ */
+@SuppressWarnings("rawtypes")
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
+					extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
+					implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
+	private transient JobConf jobConf;
+	
+	private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator;
+	private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector;
+	private transient Reporter reporter;
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) {
+		this(hadoopReducer, new JobConf());
+	}
+	
+	/**
+	 * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
+ 	 * 
+	 * @param hadoopReducer The Hadoop Reducer to wrap.
+	 * @param conf The JobConf that is used to configure the Hadoop Reducer.
+	 */
+	public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) {
+		if(hadoopReducer == null) {
+			throw new NullPointerException("Reducer may not be null.");
+		}
+		if(conf == null) {
+			throw new NullPointerException("JobConf may not be null.");
+		}
+		
+		this.reducer = hadoopReducer;
+		this.jobConf = conf;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.reducer.configure(jobConf);
+		
+		this.reporter = new HadoopDummyReporter();
+		this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>();
+		Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
+		this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(inKeyClass);
+	}
+
+	@Override
+	public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out)
+			throws Exception {
+		
+		reduceCollector.setFlinkCollector(out);
+		valueIterator.set(values.iterator());
+		reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
+		Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
+		Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);
+
+		final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+		final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
+		return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
+	}
+
+	/**
+	 * Custom serialization methods.
+	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
+	 */
+	private void writeObject(final ObjectOutputStream out) throws IOException {
+		
+		out.writeObject(reducer.getClass());
+		jobConf.write(out);		
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
+		
+		Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
+				(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
+		reducer = InstantiationUtil.instantiate(reducerClass);
+		
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
new file mode 100644
index 0000000..de20fab
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * occurrences of each word in the file and writes the result back to disk.
+ * 
+ * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
+ * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
+ */
+public class HadoopMapredCompatWordCount {
+	
+	public static void main(String[] args) throws Exception {
+		if (args.length < 2) {
+			System.err.println("Usage: WordCount <input path> <result path>");
+			return;
+		}
+		
+		final String inputPath = args[0];
+		final String outputPath = args[1];
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// Set up the Hadoop Input Format
+		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
+		TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
+		
+		// Create a Flink job with it
+		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
+		
+		DataSet<Tuple2<Text, LongWritable>> words = 
+				text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer()))
+					.groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
+		
+		// Set up Hadoop Output Format
+		HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = 
+				new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf());
+		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
+		
+		// Output & Execute
+		words.output(hadoopOutputFormat).setParallelism(1);
+		env.execute("Hadoop Compat WordCount");
+	}
+	
+	
+	public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> {
+
+		@Override
+		public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) 
+				throws IOException {
+			// normalize and split the line
+			String line = v.toString();
+			String[] tokens = line.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Text(token), new LongWritable(1l));
+				}
+			}
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+		
+		@Override
+		public void close() throws IOException { }
+		
+	}
+	
+	public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> {
+
+		@Override
+		public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep)
+				throws IOException {
+			
+			long cnt = 0;
+			while(vs.hasNext()) {
+				cnt += vs.next().get();
+			}
+			out.collect(k, new LongWritable(cnt));
+			
+		}
+		
+		@Override
+		public void configure(JobConf arg0) { }
+		
+		@Override
+		public void close() throws IOException { }
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
new file mode 100644
index 0000000..ff28a59
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.GenericDataSink;
+import org.apache.flink.compiler.contextcheck.Validatable;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+/**
+ * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
+ *
+ * Example usage:
+ * <pre>
+ * 		HadoopDataSink out = new HadoopDataSink(new org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), "Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class);
+ *		org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
+ * </pre>
+ *
+ * Note that it is possible to provide custom data type converter.
+ *
+ * The HadoopDataSink provides a default converter: {@link org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter}
+ **/
+public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable {
+
+	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
+
+	private JobConf jobConf;
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, jobConf, name, Collections.<Operator<Record>>singletonList(input), conv, keyClass, valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+
+
+	@SuppressWarnings("deprecation")
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
+		super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name);
+		
+		if (hadoopFormat == null || jobConf == null) {
+			throw new NullPointerException();
+		}
+		
+		this.name = name;
+		this.jobConf = jobConf;
+		jobConf.setOutputKeyClass(keyClass);
+		jobConf.setOutputValueClass(valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
+	}
+
+	public JobConf getJobConf() {
+		return this.jobConf;
+	}
+
+	@Override
+	public void check() {
+		// see for more details https://github.com/stratosphere/stratosphere/pull/531
+		if (FileOutputFormat.getOutputPath(jobConf) == null) {
+			throw new NullPointerException("The HadoopDataSink currently expects a correct outputPath.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
new file mode 100644
index 0000000..508f069
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+
+import org.apache.flink.api.java.record.operators.GenericDataSource;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
+ * 
+ * Example usage:
+ * <pre>
+ * 		HadoopDataSource source = new HadoopDataSource(new org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines");
+ *		org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+ * </pre>
+ * 
+ * Note that it is possible to provide custom data type converter.
+ * 
+ * The HadoopDataSource provides two different standard converters:
+ * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
+ * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
+ */
+public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> {
+
+	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";
+	
+	private JobConf jobConf;
+	
+	/**
+	 * 
+	 * @param hadoopFormat Implementation of a Hadoop input format
+	 * @param jobConf JobConf object (Hadoop)
+	 * @param name Name of the DataSource
+	 * @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}.
+	 */
+	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
+		super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name);
+		
+		if (hadoopFormat == null || jobConf == null || conv == null) {
+			throw new NullPointerException();
+		}
+		
+		this.name = name;
+		this.jobConf = jobConf;
+	}
+	
+	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) {
+		this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() );
+	}
+	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) {
+		this(hadoopFormat, jobConf, DEFAULT_NAME);
+	}
+	
+	public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
+		this(hadoopFormat, new JobConf(), DEFAULT_NAME);
+	}
+
+	public JobConf getJobConf() {
+		return this.jobConf;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
new file mode 100644
index 0000000..275fd4c
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, HadoopInputSplit> {
+
+	private static final long serialVersionUID = 1L;
+
+	public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
+	public HadoopTypeConverter<K,V> converter;
+	private String hadoopInputFormatName;
+	public JobConf jobConf;
+	public transient K key;
+	public transient V value;
+	public RecordReader<K, V> recordReader;
+	private boolean fetched = false;
+	private boolean hasNext;
+		
+	public HadoopRecordInputFormat() {
+		super();
+	}
+	
+	public HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
+		super();
+		this.hadoopInputFormat = hadoopInputFormat;
+		this.hadoopInputFormatName = hadoopInputFormat.getClass().getName();
+		this.converter = conv;
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return null;
+	}
+
+	@Override
+	public HadoopInputSplit[] createInputSplits(int minNumSplits)
+			throws IOException {
+		org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits);
+		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
+		for(int i=0;i<splitArray.length;i++){
+			hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf);
+		}
+		return hiSplit;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) {
+		return new DefaultInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(HadoopInputSplit split) throws IOException {
+		this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
+		key = this.recordReader.createKey();
+		value = this.recordReader.createValue();
+		this.fetched = false;
+	}
+
+	private void fetchNext() throws IOException {
+		hasNext = this.recordReader.next(key, value);
+		fetched = true;
+	}
+	
+	@Override
+	public boolean reachedEnd() throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		return !hasNext;
+	}
+
+	@Override
+	public Record nextRecord(Record record) throws IOException {
+		if(!fetched) {
+			fetchNext();
+		}
+		if(!hasNext) {
+			return null;
+		}
+		converter.convert(record, key, value);
+		fetched = false;
+		return record;
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.recordReader.close();
+	}
+	
+	/**
+	 * Custom serialization methods.
+	 *  @see "http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html"
+	 */
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(hadoopInputFormatName);
+		jobConf.write(out);
+		out.writeObject(converter);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		hadoopInputFormatName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
+		}
+		ReflectionUtils.setConf(hadoopInputFormat, jobConf);
+		converter = (HadoopTypeConverter<K,V>) in.readObject();
+	}
+	
+	public void setJobConf(JobConf job) {
+		this.jobConf = job;
+	}
+		
+
+	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
+		return hadoopInputFormat;
+	}
+	
+	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) {
+		this.hadoopInputFormat = hadoopInputFormat;
+	}
+	
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
new file mode 100644
index 0000000..74118a3
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
+import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
+import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
+import org.apache.flink.types.Record;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+public class HadoopRecordOutputFormat<K,V> implements OutputFormat<Record> {
+
+	private static final long serialVersionUID = 1L;
+
+	public JobConf jobConf;
+
+	public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat;
+
+	private String hadoopOutputFormatName;
+
+	public RecordWriter<K,V> recordWriter;
+
+	public FlinkTypeConverter<K,V> converter;
+
+	public HadoopFileOutputCommitter fileOutputCommitterWrapper;
+
+	public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) {
+		super();
+		this.hadoopOutputFormat = hadoopFormat;
+		this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
+		this.converter = conv;
+		this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter();
+		HadoopUtils.mergeHadoopConf(job);
+		this.jobConf = job;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+	}
+
+	/**
+	 * create the temporary output file for hadoop RecordWriter.
+	 * @param taskNumber The number of the parallel instance.
+	 * @param numTasks The number of parallel tasks.
+	 * @throws IOException
+	 */
+	@Override
+	public void open(int taskNumber, int numTasks) throws IOException {
+		this.fileOutputCommitterWrapper.setupJob(this.jobConf);
+		if (Integer.toString(taskNumber + 1).length() <= 6) {
+			this.jobConf.set("mapred.task.id", "attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0");
+			//compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
+			this.jobConf.set("mapreduce.task.output.dir", this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString());
+		} else {
+			throw new IOException("task id too large");
+		}
+		this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
+	}
+
+
+	@Override
+	public void writeRecord(Record record) throws IOException {
+		K key = this.converter.convertKey(record);
+		V value = this.converter.convertValue(record);
+		this.recordWriter.write(key, value);
+	}
+
+	/**
+	 * commit the task by moving the output file out from the temporary directory.
+	 * @throws IOException
+	 */
+	@Override
+	public void close() throws IOException {
+		this.recordWriter.close(new HadoopDummyReporter());
+		if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) {
+			this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")));
+		}
+	//TODO: commitjob when all the tasks are finished
+	}
+
+
+	/**
+	 * Custom serialization methods.
+	 *  @see "http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html"
+	 */
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		out.writeUTF(hadoopOutputFormatName);
+		jobConf.write(out);
+		out.writeObject(converter);
+		out.writeObject(fileOutputCommitterWrapper);
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		hadoopOutputFormatName = in.readUTF();
+		if(jobConf == null) {
+			jobConf = new JobConf();
+		}
+		jobConf.readFields(in);
+		try {
+			this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance();
+		} catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
+		}
+		ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
+		converter = (FlinkTypeConverter<K,V>) in.readObject();
+		fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject();
+	}
+
+
+	public void setJobConf(JobConf job) {
+		this.jobConf = job;
+	}
+
+	public JobConf getJobConf() {
+		return jobConf;
+	}
+
+	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
+		return hadoopOutputFormat;
+	}
+
+	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat) {
+		this.hadoopOutputFormat = hadoopOutputFormat;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
new file mode 100644
index 0000000..9d37988
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Convert Flink Record into the default hadoop writables.
+ */
+public class DefaultFlinkTypeConverter<K,V> implements FlinkTypeConverter<K,V> {
+	private static final long serialVersionUID = 1L;
+
+	private Class<K> keyClass;
+	private Class<V> valueClass;
+
+	public DefaultFlinkTypeConverter(Class<K> keyClass, Class<V> valueClass) {
+		this.keyClass= keyClass;
+		this.valueClass = valueClass;
+	}
+	@Override
+	public K convertKey(Record flinkRecord) {
+		if(flinkRecord.getNumFields() > 0) {
+			return convert(flinkRecord, 0, this.keyClass);
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public V convertValue(Record flinkRecord) {
+		if(flinkRecord.getNumFields() > 1) {
+			return convert(flinkRecord, 1, this.valueClass);
+		} else {
+			return null;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
+		if(hadoopType == LongWritable.class ) {
+			return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.Text.class) {
+			return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
+			return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
+			return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
+			return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
+			return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue());
+		}
+		if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
+			return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue());
+		}
+
+		throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
new file mode 100644
index 0000000..6ed670a
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * Converter for the default hadoop writables.
+ * Key will be in field 0, Value in field 1 of a Record.
+ */
+public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
+		flinkRecord.setField(0, convert(hadoopKey));
+		flinkRecord.setField(1, convert(hadoopValue));
+	}
+	
+	protected Value convert(Object hadoopType) {
+		if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
+			return new LongValue(((LongWritable)hadoopType).get());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.Text) {
+			return new StringValue(((Text)hadoopType).toString());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.IntWritable) {
+			return new IntValue(((IntWritable)hadoopType).get());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) {
+			return new FloatValue(((FloatWritable)hadoopType).get());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) {
+			return new DoubleValue(((DoubleWritable)hadoopType).get());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) {
+			return new BooleanValue(((BooleanWritable)hadoopType).get());
+		}
+		if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
+			return new ByteValue(((ByteWritable)hadoopType).get());
+		}
+		if (hadoopType instanceof NullWritable) {
+			return NullValue.getInstance();
+		}
+		
+		throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to a Flink data type.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
new file mode 100644
index 0000000..3c14a86
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.Serializable;
+
+import org.apache.flink.types.Record;
+
+/**
+ * An interface describing a class that is able to
+ * convert Flink's Record into Hadoop types model.
+ *
+ * The converter must be Serializable.
+ *
+ * Flink provides a DefaultFlinkTypeConverter. Custom implementations should
+ * chain the type converters.
+ */
+public interface FlinkTypeConverter<K,V> extends Serializable {
+
+	/**
+	 * Convert a Flink type to a Hadoop type.
+	 */
+	public K convertKey(Record record);
+
+	public V convertValue(Record record);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
new file mode 100644
index 0000000..ce4955c
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
+ * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
+ * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
+ */
+public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+		"mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+	public void setupJob(JobConf conf) throws IOException {
+		Path outputPath = FileOutputFormat.getOutputPath(conf);
+		if (outputPath != null) {
+			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+			FileSystem fileSys = tmpDir.getFileSystem(conf);
+			if (!fileSys.mkdirs(tmpDir)) {
+				LOG.error("Mkdirs failed to create " + tmpDir.toString());
+			}
+		}
+	}
+
+	private static boolean getOutputDirMarking(JobConf conf) {
+		return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,  true);
+	}
+
+	private void markSuccessfulOutputDir(JobConf conf)
+		throws IOException {
+		Path outputPath = FileOutputFormat.getOutputPath(conf);
+		if (outputPath != null) {
+			FileSystem fileSys = outputPath.getFileSystem(conf);
+			// create a file in the folder to mark it
+			if (fileSys.exists(outputPath)) {
+				Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+				fileSys.create(filePath).close();
+			}
+		}
+	}
+
+	private Path getFinalPath(Path jobOutputDir, Path taskOutput,
+							Path taskOutputPath) throws IOException {
+		URI taskOutputUri = taskOutput.toUri();
+		URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+		if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+			throw new IOException("Can not get the relative path: base = " +
+				taskOutputPath + " child = " + taskOutput);
+		}
+		if (relativePath.getPath().length() > 0) {
+			return new Path(jobOutputDir, relativePath.getPath());
+		} else {
+			return jobOutputDir;
+		}
+	}
+	private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID,
+								FileSystem fs,
+								Path jobOutputDir,
+								Path taskOutput)
+		throws IOException {
+		if (fs.isFile(taskOutput)) {
+			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+				getTempTaskOutputPath(conf, taskAttemptID));
+			if (!fs.rename(taskOutput, finalOutputPath)) {
+				if (!fs.delete(finalOutputPath, true)) {
+					throw new IOException("Failed to delete earlier output of task: " +
+						taskAttemptID);
+				}
+				if (!fs.rename(taskOutput, finalOutputPath)) {
+					throw new IOException("Failed to save output of task: " +
+						taskAttemptID);
+				}
+			}
+			LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
+		} else if(fs.getFileStatus(taskOutput).isDir()) {
+			FileStatus[] paths = fs.listStatus(taskOutput);
+			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+				getTempTaskOutputPath(conf, taskAttemptID));
+			fs.mkdirs(finalOutputPath);
+			if (paths != null) {
+				for (FileStatus path : paths) {
+					moveTaskOutputs(conf,taskAttemptID, fs, jobOutputDir, path.getPath());
+				}
+			}
+		}
+	}
+
+	public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
+		throws IOException {
+		Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
+		if (taskOutputPath != null) {
+			FileSystem fs = taskOutputPath.getFileSystem(conf);
+			if (fs.exists(taskOutputPath)) {
+				Path jobOutputPath = taskOutputPath.getParent().getParent();
+				// Move the task outputs to their final place
+				moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
+				// Delete the temporary task-specific output directory
+				if (!fs.delete(taskOutputPath, true)) {
+					LOG.info("Failed to delete the temporary output" +
+						" directory of task: " + taskAttemptID + " - " + taskOutputPath);
+				}
+				LOG.info("Saved output of task '" + taskAttemptID + "' to " +
+					jobOutputPath);
+			}
+		}
+	}
+	public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
+		throws IOException {
+		try {
+			Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
+			if (taskOutputPath != null) {
+				// Get the file-system for the task output directory
+				FileSystem fs = taskOutputPath.getFileSystem(conf);
+				// since task output path is created on demand,
+				// if it exists, task needs a commit
+				if (fs.exists(taskOutputPath)) {
+					return true;
+				}
+			}
+		} catch (IOException  ioe) {
+			throw ioe;
+		}
+		return false;
+	}
+
+	public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
+		Path outputPath = FileOutputFormat.getOutputPath(conf);
+		if (outputPath != null) {
+			Path p = new Path(outputPath,
+				(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+					"_" + taskAttemptID.toString()));
+			try {
+				FileSystem fs = p.getFileSystem(conf);
+				return p.makeQualified(fs);
+			} catch (IOException ie) {
+				LOG.warn(StringUtils.stringifyException(ie));
+				return p;
+			}
+		}
+		return null;
+	}
+	public void cleanupJob(JobConf conf) throws IOException {
+		// do the clean up of temporary directory
+		Path outputPath = FileOutputFormat.getOutputPath(conf);
+		if (outputPath != null) {
+			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+			FileSystem fileSys = tmpDir.getFileSystem(conf);
+			if (fileSys.exists(tmpDir)) {
+				fileSys.delete(tmpDir, true);
+			}
+		} else {
+			LOG.warn("Output path is null in cleanup");
+		}
+	}
+
+	public void commitJob(JobConf conf) throws IOException {
+		cleanupJob(conf);
+		if (getOutputDirMarking(conf)) {
+			markSuccessfulOutputDir(conf);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
new file mode 100644
index 0000000..6bbf077
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.Serializable;
+
+import org.apache.flink.types.Record;
+
+
+/**
+ * An interface describing a class that is able to 
+ * convert Hadoop types into Flink's Record model.
+ * 
+ * The converter must be Serializable.
+ * 
+ * Flink provides a DefaultHadoopTypeConverter. Custom implementations should
+ * chain the type converters.
+ */
+public interface HadoopTypeConverter<K, V> extends Serializable {
+	
+	/**
+	 * Convert a Hadoop type to a Flink type.
+	 */
+	public void convert(Record record, K hadoopKey, V hadoopValue);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
new file mode 100644
index 0000000..0519ac9
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.Key;
+import org.apache.hadoop.io.WritableComparable;
+
+public class WritableComparableWrapper<T extends WritableComparable<T>> extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> {
+	private static final long serialVersionUID = 1L;
+	
+	public WritableComparableWrapper() {
+		super();
+	}
+	
+	public WritableComparableWrapper(T toWrap) {
+		super(toWrap);
+	}
+
+	@Override
+	public int compareTo(WritableComparableWrapper<T> o) {
+		return super.value().compareTo(o.value());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
new file mode 100644
index 0000000..6369bb7
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.io.Writable;
+
+public class WritableWrapper<T extends Writable> implements Value {
+	private static final long serialVersionUID = 2L;
+	
+	private T wrapped;
+	private String wrappedType;
+	private ClassLoader cl;
+	
+	public WritableWrapper() {
+	}
+	
+	public WritableWrapper(T toWrap) {
+		wrapped = toWrap;
+		wrappedType = toWrap.getClass().getCanonicalName();
+	}
+
+	public T value() {
+		return wrapped;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeUTF(wrappedType);
+		wrapped.write(out);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		if(cl == null) {
+			cl = Thread.currentThread().getContextClassLoader();
+		}
+		wrappedType = in.readUTF();
+		try {
+			@SuppressWarnings("unchecked")
+			Class<T> wrClass = (Class<T>) Class.forName(wrappedType, true, cl).asSubclass(Writable.class);
+			wrapped = InstantiationUtil.instantiate(wrClass, Writable.class);
+		} catch (ClassNotFoundException e) {
+			throw new RuntimeException("Error creating the WritableWrapper", e);
+		}
+		wrapped.readFields(in);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
new file mode 100644
index 0000000..6bb13d1
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
+
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+@SuppressWarnings("rawtypes")
+public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
+		flinkRecord.setField(0, convertKey(hadoopKey));
+		flinkRecord.setField(1, convertValue(hadoopValue));
+	}
+	
+	@SuppressWarnings("unchecked")
+	private final Value convertKey(K in) {
+		return new WritableComparableWrapper(in);
+	}
+	
+	private final Value convertValue(V in) {
+		return new WritableWrapper<V>(in);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
new file mode 100644
index 0000000..b167080
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.record.example;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.io.CsvOutputFormat;
+import org.apache.flink.api.java.record.operators.FileDataSink;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
+import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file. 
+ * 
+ * <br /><br />
+ * 
+ * <b>Note</b>: This example uses the out-dated Record API.
+ * It is recommended to use the new Java API.
+ * 
+ * @see org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount
+ */
+public class WordCount implements Program, ProgramDescription {
+	
+	private static final long serialVersionUID = 1L;
+
+
+	/**
+	 * Converts a Record containing one string in to multiple string/integer pairs.
+	 * The string is tokenized by whitespaces. For each token a new record is emitted,
+	 * where the token is the first field and an Integer(1) is the second field.
+	 */
+	public static class TokenizeLine extends MapFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void map(Record record, Collector<Record> collector) {
+			// get the first field (as type StringValue) from the record
+			String line = record.getField(1, StringValue.class).getValue();
+			// normalize the line
+			line = line.replaceAll("\\W+", " ").toLowerCase();
+			
+			// tokenize the line
+			StringTokenizer tokenizer = new StringTokenizer(line);
+			while (tokenizer.hasMoreTokens()) {
+				String word = tokenizer.nextToken();
+				
+				// we emit a (word, 1) pair 
+				collector.collect(new Record(new StringValue(word), new IntValue(1)));
+			}
+		}
+	}
+
+	/**
+	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
+	 * in the record. The other fields are not modified.
+	 */
+	@Combinable
+	@ConstantFields(0)
+	public static class CountWords extends ReduceFunction implements Serializable {
+		
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+			Record element = null;
+			int sum = 0;
+			
+			while (records.hasNext()) {
+				element = records.next();
+				int cnt = element.getField(1, IntValue.class).getValue();
+				sum += cnt;
+			}
+
+			element.setField(1, new IntValue(sum));
+			out.collect(element);
+		}
+		
+		@Override
+		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+			// the logic is the same as in the reduce function, so simply call the reduce method
+			reduce(records, out);
+		}
+	}
+
+
+	@SuppressWarnings({ "rawtypes", "unchecked", "unused" })
+	@Override
+	public Plan getPlan(String... args) {
+		// parse job parameters
+		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+		String dataInput = (args.length > 1 ? args[1] : "");
+		String output    = (args.length > 2 ? args[2] : "");
+		
+		
+		HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
+		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+		
+		// Example with Wrapper Converter
+		HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(
+				new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
+		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+		
+		MapOperator mapper = MapOperator.builder(new TokenizeLine())
+			.input(source)
+			.name("Tokenize Lines")
+			.build();
+		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
+			.input(mapper)
+			.name("Count Words")
+			.build();
+		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts");
+		CsvOutputFormat.configureRecordFormat(out)
+			.recordDelimiter('\n')
+			.fieldDelimiter(' ')
+			.field(StringValue.class, 0)
+			.field(IntValue.class, 1);
+		
+		Plan plan = new Plan(out, "WordCount Example");
+		plan.setDefaultParallelism(numSubTasks);
+		return plan;
+	}
+
+
+	@Override
+	public String getDescription() {
+		return "Parameters: [numSubStasks] [input] [output]";
+	}
+
+	
+	public static void main(String[] args) throws Exception {
+		WordCount wc = new WordCount();
+		
+		if (args.length < 3) {
+			System.err.println(wc.getDescription());
+			System.exit(1);
+		}
+		
+		Plan plan = wc.getPlan(args);
+		
+		// This will execute the word-count embedded in a local context. replace this line by the commented
+		// succeeding line to send the job to a local installation or to a cluster for execution
+		LocalExecutor.execute(plan);
+//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
+//		ex.executePlan(plan);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
new file mode 100644
index 0000000..a838215
--- /dev/null
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hadoopcompatibility.mapred.record.example;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.java.record.functions.MapFunction;
+import org.apache.flink.api.java.record.functions.ReduceFunction;
+import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
+import org.apache.flink.api.java.record.operators.MapOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
+import org.apache.flink.client.LocalExecutor;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink;
+import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+/**
+ * Implements a word count which takes the input file and counts the number of
+ * the occurrences of each word in the file.
+ * 
+ * <br /><br />
+ * 
+ * <b>Note</b>: This example uses the out dated Record API.
+ * It is recommended to use the new Java API.
+ * 
+ * @see WordCount
+ */
+@SuppressWarnings("serial")
+public class WordCountWithOutputFormat implements Program, ProgramDescription {
+
+	/**
+	 * Converts a Record containing one string in to multiple string/integer pairs.
+	 * The string is tokenized by whitespaces. For each token a new record is emitted,
+	 * where the token is the first field and an Integer(1) is the second field.
+	 */
+	public static class TokenizeLine extends MapFunction implements Serializable {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void map(Record record, Collector<Record> collector) {
+			// get the first field (as type StringValue) from the record
+			String line = record.getField(1, StringValue.class).getValue();
+			// normalize the line
+			line = line.replaceAll("\\W+", " ").toLowerCase();
+
+			// tokenize the line
+			StringTokenizer tokenizer = new StringTokenizer(line);
+			while (tokenizer.hasMoreTokens()) {
+				String word = tokenizer.nextToken();
+
+				// we emit a (word, 1) pair 
+				collector.collect(new Record(new StringValue(word), new IntValue(1)));
+			}
+		}
+	}
+
+	/**
+	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
+	 * in the record. The other fields are not modified.
+	 */
+	@Combinable
+	@ConstantFields(0)
+	public static class CountWords extends ReduceFunction implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
+			Record element = null;
+			int sum = 0;
+			
+			while (records.hasNext()) {
+				element = records.next();
+				int cnt = element.getField(1, IntValue.class).getValue();
+				sum += cnt;
+			}
+
+			element.setField(1, new IntValue(sum));
+			out.collect(element);
+		}
+
+		@Override
+		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
+			// the logic is the same as in the reduce function, so simply call the reduce method
+			reduce(records, out);
+		}
+	}
+
+
+	@Override
+	public Plan getPlan(String... args) {
+		// parse job parameters
+		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
+		String dataInput = (args.length > 1 ? args[1] : "");
+		String output    = (args.length > 2 ? args[2] : "");
+
+		HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
+				new TextInputFormat(), new JobConf(), "Input Lines");
+		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
+
+
+		MapOperator mapper = MapOperator.builder(new TokenizeLine())
+				.input(source)
+				.name("Tokenize Lines")
+				.build();
+		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
+				.input(mapper)
+				.name("Count Words")
+				.build();
+		HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
+		TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
+
+		Plan plan = new Plan(out, "Hadoop OutputFormat Example");
+		plan.setDefaultParallelism(numSubTasks);
+		return plan;
+	}
+
+
+	@Override
+	public String getDescription() {
+		return "Parameters: [numSubStasks] [input] [output]";
+	}
+
+
+	public static void main(String[] args) throws Exception {
+		WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
+
+		if (args.length < 3) {
+			System.err.println(wc.getDescription());
+			System.exit(1);
+		}
+
+		Plan plan = wc.getPlan(args);
+
+		// This will execute the word-count embedded in a local context. replace this line by the commented
+		// succeeding line to send the job to a local installation or to a cluster for execution
+		LocalExecutor.execute(plan);
+//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
+//		ex.executePlan(plan);
+	}
+}


Mime
View raw message