flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [62/73] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Sat, 12 Jul 2014 12:48:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
deleted file mode 100644
index 2a0c4d3..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
-		flinkRecord.setField(0, convertKey(hadoopKey));
-		flinkRecord.setField(1, convertValue(hadoopValue));
-	}
-	
-	@SuppressWarnings("unchecked")
-	private final Value convertKey(K in) {
-		return new WritableComparableWrapper(in);
-	}
-	
-	private final Value convertValue(V in) {
-		return new WritableWrapper<V>(in);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
deleted file mode 100644
index 25caf0c..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file. 
- * 
- * <br /><br />
- * 
- * <b>Note</b>: This example uses the out dated Record API.
- * It is recommended to use the new Java API.
- * 
- * @see org.apache.flink.hadoopcompatibility.mapred.example.WordCount
- */
-public class WordCount implements Program, ProgramDescription {
-	
-	private static final long serialVersionUID = 1L;
-
-
-	/**
-	 * Converts a Record containing one string in to multiple string/integer pairs.
-	 * The string is tokenized by whitespaces. For each token a new record is emitted,
-	 * where the token is the first field and an Integer(1) is the second field.
-	 */
-	public static class TokenizeLine extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> collector) {
-			// get the first field (as type StringValue) from the record
-			String line = record.getField(1, StringValue.class).getValue();
-			// normalize the line
-			line = line.replaceAll("\\W+", " ").toLowerCase();
-			
-			// tokenize the line
-			StringTokenizer tokenizer = new StringTokenizer(line);
-			while (tokenizer.hasMoreTokens()) {
-				String word = tokenizer.nextToken();
-				
-				// we emit a (word, 1) pair 
-				collector.collect(new Record(new StringValue(word), new IntValue(1)));
-			}
-		}
-	}
-
-	/**
-	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
-	 * in the record. The other fields are not modified.
-	 */
-	@Combinable
-	@ConstantFields(0)
-	public static class CountWords extends ReduceFunction implements Serializable {
-		
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record element = null;
-			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
-				int cnt = element.getField(1, IntValue.class).getValue();
-				sum += cnt;
-			}
-
-			element.setField(1, new IntValue(sum));
-			out.collect(element);
-		}
-		
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-			// the logic is the same as in the reduce function, so simply call the reduce method
-			reduce(records, out);
-		}
-	}
-
-
-	@SuppressWarnings({ "rawtypes", "unchecked", "unused" })
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataInput = (args.length > 1 ? args[1] : "");
-		String output    = (args.length > 2 ? args[2] : "");
-		
-		
-		HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-		
-		// Example with Wrapper Converter
-		HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(
-				new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-		
-		MapOperator mapper = MapOperator.builder(new TokenizeLine())
-			.input(source)
-			.name("Tokenize Lines")
-			.build();
-		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-			.input(mapper)
-			.name("Count Words")
-			.build();
-		FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts");
-		CsvOutputFormat.configureRecordFormat(out)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(StringValue.class, 0)
-			.field(IntValue.class, 1);
-		
-		Plan plan = new Plan(out, "WordCount Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-
-	
-	public static void main(String[] args) throws Exception {
-		WordCount wc = new WordCount();
-		
-		if (args.length < 3) {
-			System.err.println(wc.getDescription());
-			System.exit(1);
-		}
-		
-		Plan plan = wc.getPlan(args);
-		
-		// This will execute the word-count embedded in a local context. replace this line by the commented
-		// succeeding line to send the job to a local installation or to a cluster for execution
-		LocalExecutor.execute(plan);
-//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//		ex.executePlan(plan);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
deleted file mode 100644
index a3cd3d5..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- * 
- * <br /><br />
- * 
- * <b>Note</b>: This example uses the out dated Record API.
- * It is recommended to use the new Java API.
- * 
- * @see WordCount
- */
-@SuppressWarnings("serial")
-public class WordCountWithOutputFormat implements Program, ProgramDescription {
-
-	/**
-	 * Converts a Record containing one string in to multiple string/integer pairs.
-	 * The string is tokenized by whitespaces. For each token a new record is emitted,
-	 * where the token is the first field and an Integer(1) is the second field.
-	 */
-	public static class TokenizeLine extends MapFunction implements Serializable {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> collector) {
-			// get the first field (as type StringValue) from the record
-			String line = record.getField(1, StringValue.class).getValue();
-			// normalize the line
-			line = line.replaceAll("\\W+", " ").toLowerCase();
-
-			// tokenize the line
-			StringTokenizer tokenizer = new StringTokenizer(line);
-			while (tokenizer.hasMoreTokens()) {
-				String word = tokenizer.nextToken();
-
-				// we emit a (word, 1) pair 
-				collector.collect(new Record(new StringValue(word), new IntValue(1)));
-			}
-		}
-	}
-
-	/**
-	 * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
-	 * in the record. The other fields are not modified.
-	 */
-	@Combinable
-	@ConstantFields(0)
-	public static class CountWords extends ReduceFunction implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
-			Record element = null;
-			int sum = 0;
-			while (records.hasNext()) {
-				element = records.next();
-				int cnt = element.getField(1, IntValue.class).getValue();
-				sum += cnt;
-			}
-
-			element.setField(1, new IntValue(sum));
-			out.collect(element);
-		}
-
-		@Override
-		public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-			// the logic is the same as in the reduce function, so simply call the reduce method
-			reduce(records, out);
-		}
-	}
-
-
-	@Override
-	public Plan getPlan(String... args) {
-		// parse job parameters
-		int numSubTasks   = (args.length > 0 ? Integer.parseInt(args[0]) : 1);
-		String dataInput = (args.length > 1 ? args[1] : "");
-		String output    = (args.length > 2 ? args[2] : "");
-
-		HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>(
-				new TextInputFormat(), new JobConf(), "Input Lines");
-		TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
-
-
-		MapOperator mapper = MapOperator.builder(new TokenizeLine())
-				.input(source)
-				.name("Tokenize Lines")
-				.build();
-		ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-				.input(mapper)
-				.name("Count Words")
-				.build();
-		HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
-		TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
-
-		Plan plan = new Plan(out, "Hadoop OutputFormat Example");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: [numSubStasks] [input] [output]";
-	}
-
-
-	public static void main(String[] args) throws Exception {
-		WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-
-		if (args.length < 3) {
-			System.err.println(wc.getDescription());
-			System.exit(1);
-		}
-
-		Plan plan = wc.getPlan(args);
-
-		// This will execute the word-count embedded in a local context. replace this line by the commented
-		// succeeding line to send the job to a local installation or to a cluster for execution
-		LocalExecutor.execute(plan);
-//		PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//		ex.executePlan(plan);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
deleted file mode 100644
index c679733..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/utils/HadoopUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-
-
-public class HadoopUtils {
-	
-	/**
-	 * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(JobConf jobConf) {
-		org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
-		for (Map.Entry<String, String> e : hadoopConf) {
-			jobConf.set(e.getKey(), e.getValue());
-		}
-	}
-	
-	public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception {
-		try {
-			// for Hadoop 1.xx
-			Class<?> clazz = null;
-			if(!TaskAttemptContext.class.isInterface()) { 
-				clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class);
-			// for Hadoop 1.xx
-			constructor.setAccessible(true);
-			JobContext context = (JobContext) constructor.newInstance(jobConf, jobId);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of JobContext.", e);
-		}
-	}
-	
-	public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf,  TaskAttemptID taskAttemptID) throws Exception {
-		try {
-			// for Hadoop 1.xx
-			Class<?> clazz = null;
-			if(!TaskAttemptContext.class.isInterface()) { 
-				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class);
-			// for Hadoop 1.xx
-			constructor.setAccessible(true);
-			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID);
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of TaskAttemptContext.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
deleted file mode 100644
index 058a60f..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * This is a dummy progress
- *
- */
-public class HadoopDummyProgressable implements Progressable {
-	@Override
-	public void progress() {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
deleted file mode 100644
index 87a6014..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This is a dummy progress monitor / reporter
- *
- */
-public class HadoopDummyReporter implements Reporter {
-
-	@Override
-	public void progress() {
-	}
-
-	@Override
-	public void setStatus(String status) {
-
-	}
-
-	@Override
-	public Counter getCounter(Enum<?> name) {
-		return null;
-	}
-
-	@Override
-	public Counter getCounter(String group, String name) {
-		return null;
-	}
-
-	@Override
-	public void incrCounter(Enum<?> key, long amount) {
-
-	}
-
-	@Override
-	public void incrCounter(String group, String counter, long amount) {
-
-	}
-
-	@Override
-	public InputSplit getInputSplit() throws UnsupportedOperationException {
-		return null;
-	}
-
-	@Override
-	public float getProgress() {
-		return 0;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
deleted file mode 100644
index da46690..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapred.JobConf;
-
-
-public class HadoopInputSplit implements InputSplit {
-
-	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
-	private JobConf jobConf;
-	private int splitNumber;
-	private String hadoopInputSplitTypeName;
-	
-	
-	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-		return hadoopInputSplit;
-	}
-	
-	
-	public HadoopInputSplit() {
-		super();
-	}
-	
-	
-	public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
-		this.hadoopInputSplit = hInputSplit;
-		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
-		this.jobConf = jobconf;
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(splitNumber);
-		out.writeUTF(hadoopInputSplitTypeName);
-		hadoopInputSplit.write(out);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber=in.readInt();
-		this.hadoopInputSplitTypeName = in.readUTF();
-		if(hadoopInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
-						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		this.hadoopInputSplit.readFields(in);
-	}
-
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-
-	public void setSplitNumber(int splitNumber) {
-		this.splitNumber = splitNumber;
-	}
-	
-	public void setHadoopInputSplit(
-			org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
-		this.hadoopInputSplit = hadoopInputSplit;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
deleted file mode 100644
index cf12cae..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.flink.types.TypeInformation;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
-	
-	private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat;
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-	private org.apache.hadoop.conf.Configuration configuration;
-	
-	private transient RecordReader<K, V> recordReader;
-	private boolean fetched = false;
-	private boolean hasNext;
-	
-	public HadoopInputFormat() {
-		super();
-	}
-	
-	public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
-		super();
-		this.mapreduceInputFormat = mapreduceInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-	
-	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
-		this.configuration = configuration;
-	}
-	
-	public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() {
-		return this.mapreduceInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) {
-		this.mapreduceInputFormat = mapreduceInputFormat;
-	}
-	
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if(!(mapreduceInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-		
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, null);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-				
-				try {
-					final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext);
-					return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-				} catch (IOException ioex) {
-					if (LOG.isWarnEnabled()) {
-						LOG.warn("Could not determine statistics due to an io error: "
-								+ ioex.getMessage());
-					}
-				} catch (Throwable t) {
-					if (LOG.isErrorEnabled()) {
-						LOG.error("Unexpected problem while getting the file statistics: "
-								+ t.getMessage(), t);
-					}
-				}
-				
-				// no statistics available
-				return null;
-	}
-	
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
-		
-		JobContext jobContext = null;
-		try {
-			jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		List<org.apache.hadoop.mapreduce.InputSplit> splits;
-		try {
-			splits = this.mapreduceInputFormat.getSplits(jobContext);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get Splits.", e);
-		}
-		HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()];
-		
-		for(int i = 0; i < hadoopInputSplits.length; i++){
-			hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext);
-		}
-		return hadoopInputSplits;
-	}
-	
-	@Override
-	public Class<? extends HadoopInputSplit> getInputSplitType() {
-		return HadoopInputSplit.class;
-	}
-	
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		TaskAttemptContext context = null;
-		try {
-			context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
-		} catch(Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		try {
-			this.recordReader = this.mapreduceInputFormat
-					.createRecordReader(split.getHadoopInputSplit(), context);
-			this.recordReader.initialize(split.getHadoopInputSplit(), context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordReader.", e);
-		} finally {
-			this.fetched = false;
-		}
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!this.fetched) {
-			fetchNext();
-		}
-		return !this.hasNext;
-	}
-	
-	private void fetchNext() throws IOException {
-		try {
-			this.hasNext = this.recordReader.nextKeyValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not fetch next KeyValue pair.", e);
-		} finally {
-			this.fetched = true;
-		}
-	}
-	
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if(!this.fetched) {
-			fetchNext();
-		}
-		if(!this.hasNext) {
-			return null;
-		}
-		try {
-			record.f0 = this.recordReader.getCurrentKey();
-			record.f1 = this.recordReader.getCurrentValue();
-		} catch (InterruptedException e) {
-			throw new IOException("Could not get KeyValue pair.", e);
-		}
-		this.fetched = false;
-		
-		return record;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-	
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-			ArrayList<FileStatus> files) throws IOException {
-		
-		long latestModTime = 0L;
-		
-		// get the file info and check whether the cached statistics are still valid.
-		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-			
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-			
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-			
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-				
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-		
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-		
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-		
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-		
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(this.mapreduceInputFormat.getClass().getName());
-		out.writeUTF(this.keyClass.getName());
-		out.writeUTF(this.valueClass.getName());
-		this.configuration.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-		
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-		
-		if(this.configuration == null) {
-			this.configuration = configuration;
-		}
-		
-		try {
-			this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  ResultTypeQueryable
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
deleted file mode 100644
index 9eabc03..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-
-
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private org.apache.hadoop.conf.Configuration configuration;
-	private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat;
-	private transient RecordWriter<K,V> recordWriter;
-	private transient FileOutputCommitter fileOutputCommitter;
-	private transient TaskAttemptContext context;
-	
-	public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) {
-		super();
-		this.mapreduceOutputFormat = mapreduceOutputFormat;
-		this.configuration = job.getConfiguration();
-		HadoopUtils.mergeHadoopConf(configuration);
-	}
-	
-	public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) {
-		this.configuration = configuration;
-	}
-	
-	public org.apache.hadoop.conf.Configuration getConfiguration() {
-		return this.configuration;
-	}
-	
-	public org.apache.hadoop.mapreduce.OutputFormat<K,V> getHadoopOutputFormat() {
-		return this.mapreduceOutputFormat;
-	}
-	
-	public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat) {
-		this.mapreduceOutputFormat = mapreduceOutputFormat;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-		
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.output.basename", "tmp");
-		
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-				+ Integer.toString(taskNumber + 1) 
-				+ "_0");
-		
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		this.configuration.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		
-		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
-		
-		try {
-			this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID()));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		// compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-		this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString());
-		
-		try {
-			this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not create RecordWriter.", e);
-		}
-	}
-	
-	
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		try {
-			this.recordWriter.write(record.f0, record.f1);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not write Record.", e);
-		}
-	}
-	
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		try {
-			this.recordWriter.close(this.context);
-		} catch (InterruptedException e) {
-			throw new IOException("Could not close RecordReader.", e);
-		}
-		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
-		}
-		this.fileOutputCommitter.commitJob(this.context);
-		
-		// rename tmp-* files to final name
-		FileSystem fs = FileSystem.get(this.configuration);
-		
-		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
-
-		final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)");
-		
-		// isDirectory does not work in hadoop 1
-		if(fs.getFileStatus(outputPath).isDir()) {
-			FileStatus[] files = fs.listStatus(outputPath);
-			
-			for(FileStatus f : files) {
-				Matcher m = p.matcher(f.getPath().getName());
-				if(m.matches()) {
-					int part = Integer.valueOf(m.group(2));
-					fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part));
-				}
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
-		this.configuration.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopOutputFormatClassName = in.readUTF();
-		
-		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
-		configuration.readFields(in);
-		
-		if(this.configuration == null) {
-			this.configuration = configuration;
-		}
-		
-		try {
-			this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
deleted file mode 100644
index 36ea378..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.example;
-
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-	
-	public static void main(String[] args) throws Exception {
-		if (args.length < 2) {
-			System.err.println("Usage: WordCount <input path> <result path>");
-			return;
-		}
-		
-		final String inputPath = args[0];
-		final String outputPath = args[1];
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
-		
-		// Set up the Hadoop Input Format
-		Job job = Job.getInstance();
-		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
-		TextInputFormat.addInputPath(job, new Path(inputPath));
-		
-		// Create a Flink job with it
-		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-		
-		// Tokenize the line and convert from Writable "Text" to String for better handling
-		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-		
-		// Sum up the words
-		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-		
-		// Convert String back to Writable "Text" for use with Hadoop Output Format
-		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-		
-		// Set up Hadoop Output Format
-		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);
-		hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
-		hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test
-		// is being executed with both types (hadoop1 and hadoop2 profile)
-		TextOutputFormat.setOutputPath(job, new Path(outputPath));
-		
-		// Output & Execute
-		hadoopResult.output(hadoopOutputFormat);
-		env.execute("Word Count");
-	}
-	
-	/**
-	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
-	 */
-	public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-		
-		@Override
-		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String line = value.f1.toString();
-			String[] tokens = line.toLowerCase().split("\\W+");
-			
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Converts Java data types to Hadoop Writables.
-	 */
-	public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-		
-		@Override
-		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
-			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
-		}
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
deleted file mode 100644
index eadbd0b..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapreduce.utils;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-
-import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-public class HadoopUtils {
-	
-	/**
-	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
-	 */
-	public static void mergeHadoopConf(Configuration configuration) {
-		Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
-		
-		for (Map.Entry<String, String> e : hadoopConf) {
-			configuration.set(e.getKey(), e.getValue());
-		}
-	}
-	
-	public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if(JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader());
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader());
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class);
-			JobContext context = (JobContext) constructor.newInstance(configuration, jobId);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of JobContext.");
-		}
-	}
-	
-	public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID taskAttemptID) throws Exception {
-		try {
-			Class<?> clazz;
-			// for Hadoop 1.xx
-			if(JobContext.class.isInterface()) {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-			}
-			// for Hadoop 2.xx
-			else {
-				clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
-			}
-			Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class);
-			TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID);
-			
-			return context;
-		} catch(Exception e) {
-			throw new Exception("Could not create instance of TaskAttemptContext.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
deleted file mode 100644
index b53fc9f..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.mapreduce.JobContext;
-
-
-public class HadoopInputSplit implements InputSplit {
-	
-	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
-	public transient JobContext jobContext;
-	
-	private int splitNumber;	
-	
-	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
-		return mapreduceInputSplit;
-	}
-	
-	
-	public HadoopInputSplit() {
-		super();
-	}
-	
-	
-	public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) {
-		if(!(mapreduceInputSplit instanceof Writable)) {
-			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
-		}
-		this.mapreduceInputSplit = mapreduceInputSplit;
-		this.jobContext = jobContext;
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.splitNumber);
-		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-		Writable w = (Writable) this.mapreduceInputSplit;
-		w.write(out);
-	}
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber=in.readInt();
-		String className = in.readUTF();
-		
-		if(this.mapreduceInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
-						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
-			} catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		((Writable)this.mapreduceInputSplit).readFields(in);
-	}
-	
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-	
-	public void setSplitNumber(int splitNumber) {
-		this.splitNumber = splitNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
deleted file mode 100644
index d13d0f2..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.flink.hadoopcompatibility.mapred.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-	
-	protected String textPath;
-	protected String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
deleted file mode 100644
index 547ea60..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred.record;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-/**
- * test the hadoop inputformat and outputformat
- */
-public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
-	protected String textPath;
-	protected String resultPath;
-	protected String counts;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-		counts = WordCountData.COUNTS.replaceAll(" ", "\t");
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		//WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat
-		WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-		return wc.getPlan("1", textPath, resultPath);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// Test results, append /1 to resultPath due to the generated _temproray file.
-		compareResultsByLinesInMemory(counts, resultPath + "/1");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
deleted file mode 100644
index 10dab3f..0000000
--- a/flink-addons/hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-	
-	protected String textPath;
-	protected String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1");
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/pom.xml b/flink-addons/hbase/pom.xml
deleted file mode 100644
index 4c309f9..0000000
--- a/flink-addons/hbase/pom.xml
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<artifactId>flink-addons</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<repositories>
-		<repository>
-			<id>cloudera-releases</id>
-			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
-			<releases>
-				<enabled>true</enabled>
-			</releases>
-			<snapshots>
-				<enabled>false</enabled>
-			</snapshots>
-		</repository>
-	</repositories>
-
-	<properties>
- 		<hbase.version>0.96.0-hadoop2</hbase.version>
-	</properties>
-
-	<artifactId>hbase</artifactId>
-	<name>hbase</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase</artifactId>
-			<version>0.94.2-cdh4.2.1</version>
-			<exclusions>
-				<!-- jruby is used for the hbase shell. -->
-				<exclusion>
-					<groupId>org.jruby</groupId>
-					<artifactId>jruby-complete</artifactId>
-					</exclusion>
-				</exclusions> 
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-client</artifactId>
-			<version>${hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>asm</groupId>
-					<artifactId>asm</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-		<!-- <dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-server</artifactId>
-			<version>${hbase.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
-			<version>${hbase.version}</version>
-		</dependency>
-		 -->
-
-	<!-- hadoop-client is available for yarn and non-yarn, so there is no need 
-		to use profiles See ticket https://issues.apache.org/jira/browse/HADOOP-8009 
-		for description of hadoop-clients -->
-
-	<reporting>
-		<plugins>
-		</plugins>
-	</reporting>
-
-	<build>
-		<plugins>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
deleted file mode 100644
index 9029030..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/GenericTableOutputFormat.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-public abstract class GenericTableOutputFormat implements OutputFormat<Record> {
-
-	private static final long serialVersionUID = 1L;
-
-	public static final String JT_ID_KEY = "pact.hbase.jtkey";
-
-	public static final String JOB_ID_KEY = "pact.job.id";
-
-	private RecordWriter<ImmutableBytesWritable, KeyValue> writer;
-
-	private Configuration config;
-
-	private org.apache.hadoop.conf.Configuration hadoopConfig;
-
-	private TaskAttemptContext context;
-
-	private String jtID;
-
-	private int jobId;
-
-
-	@Override
-	public void configure(Configuration parameters) {
-		this.config = parameters;
-
-		// get the ID parameters
-		this.jtID = parameters.getString(JT_ID_KEY, null);
-		if (this.jtID == null) {
-			throw new RuntimeException("Missing JT_ID entry in hbase config.");
-		}
-		this.jobId = parameters.getInteger(JOB_ID_KEY, -1);
-		if (this.jobId < 0) {
-			throw new RuntimeException("Missing or invalid job id in input config.");
-		}
-	}
-
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		this.hadoopConfig = getHadoopConfig(this.config);
-		
-		/**
-		 * PLASE NOTE:
-		 * If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
-		 * close the pact-hbase project OR set the maven profile to hadoop_yarn
-		 * 
-		 * pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
-		 * it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
-		 */
-		final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
-
-		this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
-		final HFileOutputFormat outFormat = new HFileOutputFormat();
-		try {
-			this.writer = outFormat.getRecordWriter(this.context);
-		} catch (InterruptedException iex) {
-			throw new IOException("Opening the writer was interrupted.", iex);
-		}
-	}
-
-	@Override
-	public void close() throws IOException {
-		final RecordWriter<ImmutableBytesWritable, KeyValue> writer = this.writer;
-		this.writer = null;
-		if (writer != null) {
-			try {
-				writer.close(this.context);
-			} catch (InterruptedException iex) {
-				throw new IOException("Closing was interrupted.", iex);
-			}
-		}
-	}
-
-	public void collectKeyValue(KeyValue kv) throws IOException {
-		try {
-			this.writer.write(null, kv);
-		} catch (InterruptedException iex) {
-			throw new IOException("Write request was interrupted.", iex);
-		}
-	}
-
-	public abstract org.apache.hadoop.conf.Configuration getHadoopConfig(Configuration config);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java b/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
deleted file mode 100644
index ac41927..0000000
--- a/flink-addons/hbase/src/main/java/org/apache/flink/addons/hbase/HBaseDataSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.util.Random;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-
-/**
- * A sink for writing to HBase
- */
-public class HBaseDataSink extends GenericDataSink {
-	
-	private static final int IDENTIFYIER_LEN = 16;
-	
-	public HBaseDataSink(GenericTableOutputFormat f, Operator input, String name) {
-		super(f, input, name);
-		
-		// generate a random unique identifier string
-		final Random rnd = new Random();
-		final StringBuilder bld = new StringBuilder();
-		for (int i = 0; i < IDENTIFYIER_LEN; i++) {
-			bld.append((char) (rnd.nextInt(26) + 'a'));
-		}
-		
-		setParameter(GenericTableOutputFormat.JT_ID_KEY, bld.toString());
-		setParameter(GenericTableOutputFormat.JOB_ID_KEY, rnd.nextInt());
-	}
-}


Mime
View raw message