flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [47/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
deleted file mode 100644
index 00fd1f9..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-
-public class HadoopInputFormatTest {
-
-
-	public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
-
-		public DummyVoidKeyInputFormat() {
-		}
-
-		@Override
-		public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
-			return null;
-		}
-	}
-	
-	
-	@Test
-	public void checkTypeInformation() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			// Set up the Hadoop Input Format
-			Job job = Job.getInstance();
-			HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf());
-
-			TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
-			TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-			
-			if(tupleType.isTupleType()) {
-				if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
-					fail("Tuple type information was not set correctly!");
-				}
-			} else {
-				fail("Type information was not set to tuple type information!");
-			}
-
-		}
-		catch (Exception ex) {
-			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
deleted file mode 100644
index 850e799..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {
-
-	public HadoopMapFunctionITCase(ExecutionMode mode){
-		super(mode);
-	}
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Test
-	public void testNonPassingMapper() throws Exception{
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
-		DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds.
-				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-		env.execute();
-
-		compareResultsByLinesInMemory("\n", resultPath);
-	}
-
-	@Test
-	public void testDataDuplicatingMapper() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
-		DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds.
-				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-		env.execute();
-
-		String expected = "(1,Hi)\n" + "(1,HI)\n" +
-				"(2,Hello)\n" + "(2,HELLO)\n" +
-				"(3,Hello world)\n" + "(3,HELLO WORLD)\n" +
-				"(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" +
-				"(5,I am fine.)\n" + "(5,I AM FINE.)\n" +
-				"(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" +
-				"(7,Comment#1)\n" + "(7,COMMENT#1)\n" +
-				"(8,Comment#2)\n" + "(8,COMMENT#2)\n" +
-				"(9,Comment#3)\n" + "(9,COMMENT#3)\n" +
-				"(10,Comment#4)\n" + "(10,COMMENT#4)\n" +
-				"(11,Comment#5)\n" + "(11,COMMENT#5)\n" +
-				"(12,Comment#6)\n" + "(12,COMMENT#6)\n" +
-				"(13,Comment#7)\n" + "(13,COMMENT#7)\n" +
-				"(14,Comment#8)\n" + "(14,COMMENT#8)\n" +
-				"(15,Comment#9)\n" + "(15,COMMENT#9)\n" +
-				"(16,Comment#10)\n" + "(16,COMMENT#10)\n" +
-				"(17,Comment#11)\n" + "(17,COMMENT#11)\n" +
-				"(18,Comment#12)\n" + "(18,COMMENT#12)\n" +
-				"(19,Comment#13)\n" + "(19,COMMENT#13)\n" +
-				"(20,Comment#14)\n" + "(20,COMMENT#14)\n" +
-				"(21,Comment#15)\n" + "(21,COMMENT#15)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testConfigurableMapper() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		JobConf conf = new JobConf();
-		conf.set("my.filterPrefix", "Hello");
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
-		DataSet<Tuple2<IntWritable, Text>> hellos = ds.
-				flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-		env.execute();
-
-		String expected = "(2,Hello)\n" +
-				"(3,Hello world)\n" +
-				"(4,Hello world, how are you?)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-	
-
-	
-	public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-		
-		@Override
-		public void map(final IntWritable k, final Text v, 
-				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
-			if ( v.toString().contains("bananas") ) {
-				out.collect(k,v);
-			}
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-		
-		@Override
-		public void map(final IntWritable k, final Text v, 
-				final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException {
-			out.collect(k, v);
-			out.collect(k, new Text(v.toString().toUpperCase()));
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
-		private String filterPrefix;
-		
-		@Override
-		public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r)
-				throws IOException {
-			if(v.toString().startsWith(filterPrefix)) {
-				out.collect(k, v);
-			}
-		}
-		
-		@Override
-		public void configure(JobConf c) {
-			filterPrefix = c.get("my.filterPrefix");
-		}
-
-		@Override
-		public void close() throws IOException { }
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
deleted file mode 100644
index b6650d2..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopMapredITCase extends JavaProgramTestBase {
-	
-	protected String textPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-		this.setDegreeOfParallelism(4);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath });
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
deleted file mode 100644
index 92b0dc3..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.hamcrest.core.IsEqual;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {
-
-	public HadoopReduceCombineFunctionITCase(ExecutionMode mode){
-		super(mode);
-	}
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Test
-	public void testStandardCountingWithCombiner() throws Exception{
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper1());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
-				groupBy(0).
-				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-						new SumReducer(), new SumReducer()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		counts.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(0,5)\n"+
-				"(1,6)\n" +
-				"(2,6)\n" +
-				"(3,4)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testUngroupedHadoopReducer() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper2());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> sum = ds.
-				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-						new SumReducer(), new SumReducer()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		sum.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(0,231)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testCombiner() throws Exception {
-		org.junit.Assume.assumeThat(mode, new IsEqual<ExecutionMode>(ExecutionMode.CLUSTER));
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper3());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> counts = ds.
-				groupBy(0).
-				reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>(
-						new SumReducer(), new KeyChangingReducer()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		counts.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(0,5)\n"+
-				"(1,6)\n" +
-				"(2,5)\n" +
-				"(3,5)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testConfigurationViaJobConf() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		JobConf conf = new JobConf();
-		conf.set("my.cntPrefix", "Hello");
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper4());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds.
-				groupBy(0).
-				reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
-						new ConfigurableCntReducer(), conf));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		hellos.writeAsText(resultPath);
-		env.execute();
-
-		// return expected result
-		String expected = "(0,0)\n"+
-				"(1,0)\n" +
-				"(2,1)\n" +
-				"(3,1)\n" +
-				"(4,1)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-	
-	public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
-
-		@Override
-		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			
-			int sum = 0;
-			while(v.hasNext()) {
-				sum += v.next().get();
-			}
-			out.collect(k, new IntWritable(sum));
-		}
-		
-		@Override
-		public void configure(JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
-
-		@Override
-		public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			while(v.hasNext()) {
-				out.collect(new IntWritable(k.get() % 4), v.next());
-			}
-		}
-		
-		@Override
-		public void configure(JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		private String countPrefix;
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith(this.countPrefix)) {
-					commentCnt++;
-				}
-			}
-			out.collect(k, new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf c) { 
-			this.countPrefix = c.get("my.cntPrefix");
-		}
-
-		@Override
-		public void close() throws IOException { }
-	}
-
-	public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
-			IntWritable>> {
-		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
-		@Override
-		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			outT.f0 = new IntWritable(v.f0.get() / 6);
-			outT.f1 = new IntWritable(1);
-			return outT;
-		}
-	}
-
-	public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable,
-			IntWritable>> {
-		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
-		@Override
-		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			outT.f0 = new IntWritable(0);
-			outT.f1 = v.f0;
-			return outT;
-		}
-	}
-
-	public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
-		private static final long serialVersionUID = 1L;
-		Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>();
-		@Override
-		public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			outT.f0 = v.f0;
-			outT.f1 = new IntWritable(1);
-			return outT;
-		}
-	}
-
-	public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			v.f0 = new IntWritable(v.f0.get() % 5);
-			return v;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
deleted file mode 100644
index 1801c3e..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase {
-
-	public HadoopReduceFunctionITCase(ExecutionMode mode){
-		super(mode);
-	}
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Test
-	public void testStandardGrouping() throws Exception{
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper1());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-				groupBy(0).
-				reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		commentCnts.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(0,0)\n"+
-				"(1,3)\n" +
-				"(2,5)\n" +
-				"(3,5)\n" +
-				"(4,2)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testUngroupedHadoopReducer() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
-
-		DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-				reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer()));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		commentCnts.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(42,15)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testConfigurationViaJobConf() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		JobConf conf = new JobConf();
-		conf.set("my.cntPrefix", "Hello");
-
-		DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
-				map(new Mapper2());
-
-		DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
-				groupBy(0).
-				reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
-						new ConfigurableCntReducer(), conf));
-
-		String resultPath = tempFolder.newFile().toURI().toString();
-
-		helloCnts.writeAsText(resultPath);
-		env.execute();
-
-		String expected = "(0,0)\n"+
-				"(1,0)\n" +
-				"(2,1)\n" +
-				"(3,1)\n" +
-				"(4,1)\n";
-
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-	
-	public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
-					commentCnt++;
-				}
-			}
-			out.collect(k, new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
-					commentCnt++;
-				}
-			}
-			out.collect(new IntWritable(42), new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
-		private String countPrefix;
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith(this.countPrefix)) {
-					commentCnt++;
-				}
-			}
-			out.collect(k, new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf c) { 
-			this.countPrefix = c.get("my.cntPrefix");
-		}
-
-		@Override
-		public void close() throws IOException { }
-	}
-
-	public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			v.f0 = new IntWritable(v.f0.get() / 5);
-			return v;
-		}
-	}
-
-	public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
-		throws Exception {
-			v.f0 = new IntWritable(v.f0.get() % 5);
-			return v;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
deleted file mode 100644
index eed6f8f..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-
-public class HadoopTestData {
-
-	public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) {
-		
-		List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>();
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
-		
-		Collections.shuffle(data);
-		
-		return env.fromCollection(data);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
deleted file mode 100644
index fe7ea8e..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred.record;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-/**
- * test the hadoop inputformat and outputformat
- */
-public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
-	protected String textPath;
-	protected String resultPath;
-	protected String counts;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-		counts = WordCountData.COUNTS.replaceAll(" ", "\t");
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		//WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat
-		WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-		return wc.getPlan("1", textPath, resultPath);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// Test results, append /1 to resultPath due to the generated _temproray file.
-		compareResultsByLinesInMemory(counts, resultPath + "/1");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
deleted file mode 100644
index 2592b88..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
-
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class HadoopTupleUnwrappingIteratorTest {
-
-	@Test
-	public void testValueIterator() {
-		
-		HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
-				new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class);
-		
-		// many values
-		
-		ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
-		
-		int expectedKey = 1;
-		int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// one value
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
-		
-		expectedKey = 2;
-		expectedValues = new int[]{10};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// more values
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
-		
-		expectedKey = 3;
-		expectedValues = new int[]{10,4,7,9,21};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// no has next calls
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
-		
-		expectedKey = 4;
-		expectedValues = new int[]{5,8,42,-1,0};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-		}
-		try {
-			valIt.next();
-			Assert.fail();
-		} catch (NoSuchElementException nsee) {
-			// expected
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
deleted file mode 100644
index d79afaa..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.fail;
-
-
-
-public class HadoopInputFormatTest {
-
-
-	public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> {
-
-		public DummyVoidKeyInputFormat() {
-		}
-
-		@Override
-		public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-			return null;
-		}
-	}
-	
-	
-	@Test
-	public void checkTypeInformation() {
-		try {
-			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-			// Set up the Hadoop Input Format
-			Job job = Job.getInstance();
-			HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job);
-
-			TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType();
-			TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
-			
-			if(tupleType.isTupleType()) {
-				if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
-					fail("Tuple type information was not set correctly!");
-				}
-			} else {
-				fail("Type information was not set to tuple type information!");
-			}
-
-		}
-		catch (Exception ex) {
-			fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage());
-		}
-
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
deleted file mode 100644
index 7eee629..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapreduce;
-
-import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class HadoopInputOutputITCase extends JavaProgramTestBase {
-	
-	protected String textPath;
-	protected String resultPath;
-	
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		textPath = createTempFile("text.txt", WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-		this.setDegreeOfParallelism(4);
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"});
-	}
-	
-	@Override
-	protected void testProgram() throws Exception {
-		WordCount.main(new String[] { textPath, resultPath });
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/pom.xml b/flink-addons/flink-hbase/pom.xml
deleted file mode 100644
index 6807b0c..0000000
--- a/flink-addons/flink-hbase/pom.xml
+++ /dev/null
@@ -1,165 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-hbase</artifactId>
-	<name>flink-hbase</name>
-	<packaging>jar</packaging>
-
-	<properties>
-		<hbase.hadoop1.version>0.98.6.1-hadoop1</hbase.hadoop1.version>
-		<hbase.hadoop2.version>0.98.6.1-hadoop2</hbase.hadoop2.version>
-	</properties>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-core</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-compatibility</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.hbase</groupId>
-			<artifactId>hbase-client</artifactId>
-			<version>${hbase.version}</version>
-			<exclusions>
-				<!-- Remove unneeded dependency, which is conflicting with our jetty-util version. -->
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-	</dependencies>
-
-	<profiles>
-		<profile>
-			<id>hadoop-1</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
-					<!--hadoop1--><name>hadoop.profile</name><value>1</value>
-				</property>
-			</activation>
-			<properties>
-				<hbase.version>${hbase.hadoop1.version}</hbase.version>
-			</properties>
-			<dependencies>
-				<!-- Force hadoop-common dependency -->
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-core</artifactId>
-				</dependency>
-			</dependencies>
-		</profile>
-		
-		<profile>
-			<id>hadoop-2</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
-					<!--hadoop2--><name>!hadoop.profile</name>
-				</property>
-			</activation>
-			<properties>
-				<hbase.version>${hbase.hadoop2.version}</hbase.version>
-			</properties>
-			<dependencies>
-				<!-- Force hadoop-common dependency -->
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-common</artifactId>
-				</dependency>
-			</dependencies>
-		</profile>
-		
-		<profile>
-			<id>cdh5.1.3</id>
-			<properties>
-				<hadoop.profile>2</hadoop.profile>
-				<hbase.version>0.98.1-cdh5.1.3</hbase.version>
-				<hadoop.version>2.3.0-cdh5.1.3</hadoop.version>
-				<!-- Cloudera use different versions for hadoop core and commons-->
-				<!-- This profile could be removed if Cloudera fix this mismatch! -->
-				<hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version>
-			</properties>
-			<dependencyManagement>
-				<dependencies>
-					<dependency>
-						<groupId>org.apache.hadoop</groupId>
-						<artifactId>hadoop-core</artifactId>
-						<version>${hadoop.core.version}</version>
-					</dependency>
-				</dependencies>
-			</dependencyManagement>
-			<dependencies>
-				<!-- Force hadoop-common dependency -->
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-common</artifactId>
-					<version>${hadoop.version}</version>
-				</dependency>
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-mapreduce-client-core</artifactId>
-					<version>${hadoop.version}</version>
-				</dependency>
-			</dependencies>
-		</profile>
-
-	</profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
deleted file mode 100755
index 9c861ed..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link InputFormat} subclass that wraps the access for HTables.
- *
- */
-public abstract class TableInputFormat<T extends Tuple> implements InputFormat<T, TableInputSplit>{
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class);
-
-	/** helper variable to decide whether the input is exhausted or not */
-	private boolean endReached = false;
-
-	// TODO table and scan could be serialized when kryo serializer will be the default
-	protected transient HTable table;
-	protected transient Scan scan;
-
-	/** HBase iterator wrapper */
-	private ResultScanner rs;
-
-	private byte[] lastRow;
-	private int scannedRows;
-
-	// abstract methods allow for multiple table and scanners in the same job
-	protected abstract Scan getScanner();
-	protected abstract String getTableName();
-	protected abstract T mapResultToTuple(Result r);
-
-	/**
-	 * creates a {@link Scan} object and a {@link HTable} connection
-	 *
-	 * @param parameters
-	 * @see Configuration
-	 */
-	@Override
-	public void configure(Configuration parameters) {
-		this.table = createTable();
-		this.scan = getScanner();
-	}
-
-	/** Create an {@link HTable} instance and set it into this format */
-	private HTable createTable() {
-		LOG.info("Initializing HBaseConfiguration");
-		//use files found in the classpath
-		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
-
-		try {
-			return new HTable(hConf, getTableName());
-		} catch (Exception e) {
-			LOG.error("Error instantiating a new HTable instance", e);
-		}
-		return null;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return this.endReached;
-	}
-
-	@Override
-	public T nextRecord(T reuse) throws IOException {
-		if (this.rs == null){
-			throw new IOException("No table result scanner provided!");
-		}
-		try{
-			Result res = this.rs.next();
-			if (res != null){
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		}catch (Exception e) {
-			this.rs.close();
-			//workaround for timeout on scan
-			StringBuffer logMsg = new StringBuffer("Error after scan of ")
-					.append(scannedRows)
-					.append(" rows. Retry with a new scanner...");
-			LOG.warn(logMsg.toString(), e);
-			this.scan.setStartRow(lastRow);
-			this.rs = table.getScanner(scan);
-			Result res = this.rs.next();
-			if (res != null) {
-				scannedRows++;
-				lastRow = res.getRow();
-				return mapResultToTuple(res);
-			}
-		}
-
-		this.endReached = true;
-		return null;
-	}
-
-	@Override
-	public void open(TableInputSplit split) throws IOException {
-		if (split == null){
-			throw new IOException("Input split is null!");
-		}
-		if (table == null){
-			throw new IOException("No HTable provided!");
-		}
-		if (scan == null){
-			throw new IOException("No Scan instance provided");
-		}
-
-		logSplitInfo("opening", split);
-		scan.setStartRow(split.getStartRow());
-		lastRow = split.getEndRow();
-		scan.setStopRow(lastRow);
-
-		this.rs = table.getScanner(scan);
-		this.endReached = false;
-		this.scannedRows = 0;
-	}
-
-	@Override
-	public void close() throws IOException {
-		if(rs!=null){
-			this.rs.close();
-		}
-		if(table!=null){
-			this.table.close();
-		}
-		LOG.info("Closing split (scanned {} rows)", scannedRows);
-		this.lastRow = null;
-	}
-
-	@Override
-	public TableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
-		//Gets the starting and ending row keys for every region in the currently open table
-		final Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
-		if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
-			throw new IOException("Expecting at least one region.");
-		}
-		final byte[] startRow = scan.getStartRow();
-		final byte[] stopRow = scan.getStopRow();
-		final boolean scanWithNoLowerBound = startRow.length == 0;
-		final boolean scanWithNoUpperBound = stopRow.length == 0;
-
-		final List<TableInputSplit> splits = new ArrayList<TableInputSplit>(minNumSplits);
-		for (int i = 0; i < keys.getFirst().length; i++) {
-			final byte[] startKey = keys.getFirst()[i];
-			final byte[] endKey = keys.getSecond()[i];
-			final String regionLocation = table.getRegionLocation(startKey, false).getHostnamePort();
-			//Test if the given region is to be included in the InputSplit while splitting the regions of a table
-			if (!includeRegionInSplit(startKey, endKey)) {
-				continue;
-			}
-			//Finds the region on which the given row is being served
-			final String[] hosts = new String[] { regionLocation };
-
-			// determine if regions contains keys used by the scan
-			boolean isLastRegion = endKey.length == 0;
-			if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
-					(scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
-
-				final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
-				final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
-						&& !isLastRegion ? endKey : stopRow;
-				int id = splits.size();
-				final TableInputSplit split = new TableInputSplit(id, hosts, table.getTableName(), splitStart, splitStop);
-				splits.add(split);
-			}
-		}
-		LOG.info("Created " + splits.size() + " splits");
-		for (TableInputSplit split : splits) {
-			logSplitInfo("created", split);
-		}
-		return splits.toArray(new TableInputSplit[0]);
-	}
-
-	private void logSplitInfo(String action, TableInputSplit split) {
-		int splitId = split.getSplitNumber();
-		String splitStart = Bytes.toString(split.getStartRow());
-		String splitEnd = Bytes.toString(split.getEndRow());
-		String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
-		String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
-		String[] hostnames = split.getHostnames();
-		LOG.info("{} split [{}|{}|{}|{}]",action, splitId, hostnames, splitStartKey, splitStopKey);
-	}
-
-	/**
-	 * Test if the given region is to be included in the InputSplit while splitting the regions of a table.
-	 * <p>
-	 * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
-	 * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
-	 * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R
-	 * processing, continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due
-	 * to the ordering of the keys. <br>
-	 * <br>
-	 * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region. <br>
-	 * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded(
-	 * i.e. all regions are included).
-	 *
-	 * @param startKey
-	 *        Start key of the region
-	 * @param endKey
-	 *        End key of the region
-	 * @return true, if this region needs to be included as part of the input (default).
-	 */
-	private static boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
-		return true;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(TableInputSplit[] inputSplits) {
-		return new LocatableInputSplitAssigner(inputSplits);
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-		return null;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java b/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
deleted file mode 100644
index 6d8bf42..0000000
--- a/flink-addons/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.addons.hbase;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * This class implements a input splits for HBase. Each table input split corresponds to a key range (low, high). All
- * references to row below refer to the key of the row.
- */
-public class TableInputSplit extends LocatableInputSplit {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * The name of the table to retrieve data from
-	 */
-	private byte[] tableName;
-
-	/**
-	 * The start row of the split.
-	 */
-	private byte[] startRow;
-
-	/**
-	 * The end row of the split.
-	 */
-	private byte[] endRow;
-
-	/**
-	 * Creates a new table input split
-	 * 
-	 * @param splitNumber
-	 *        the number of the input split
-	 * @param hostnames
-	 *        the names of the hosts storing the data the input split refers to
-	 * @param tableName
-	 *        the name of the table to retrieve data from
-	 * @param startRow
-	 *        the start row of the split
-	 * @param endRow
-	 *        the end row of the split
-	 */
-	TableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
-			final byte[] endRow) {
-		super(splitNumber, hostnames);
-
-		this.tableName = tableName;
-		this.startRow = startRow;
-		this.endRow = endRow;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public TableInputSplit() {
-		super();
-
-		this.tableName = null;
-		this.startRow = null;
-		this.endRow = null;
-	}
-
-	/**
-	 * Returns the table name.
-	 * 
-	 * @return The table name.
-	 */
-	public byte[] getTableName() {
-		return this.tableName;
-	}
-
-	/**
-	 * Returns the start row.
-	 * 
-	 * @return The start row.
-	 */
-	public byte[] getStartRow() {
-		return this.startRow;
-	}
-
-	/**
-	 * Returns the end row.
-	 * 
-	 * @return The end row.
-	 */
-	public byte[] getEndRow() {
-		return this.endRow;
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		super.write(out);
-
-		// Write the table name
-		if (this.tableName == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.tableName.length);
-			out.write(this.tableName);
-		}
-
-		// Write the start row
-		if (this.startRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.startRow.length);
-			out.write(this.startRow);
-		}
-
-		// Write the end row
-		if (this.endRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.endRow.length);
-			out.write(this.endRow);
-		}
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		super.read(in);
-
-		// Read the table name
-		int len = in.readInt();
-		if (len >= 0) {
-			this.tableName = new byte[len];
-			in.readFully(this.tableName);
-		}
-
-		// Read the start row
-		len = in.readInt();
-		if (len >= 0) {
-			this.startRow = new byte[len];
-			in.readFully(this.startRow);
-		}
-
-		// Read the end row
-		len = in.readInt();
-		if (len >= 0) {
-			this.endRow = new byte[len];
-			in.readFully(this.endRow);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java b/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
deleted file mode 100755
index b6f345a..0000000
--- a/flink-addons/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseReadExample.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.addons.hbase.example;
-
-import org.apache.flink.addons.hbase.TableInputFormat;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Simple stub for HBase DataSet
- * 
- * To run the test first create the test table with hbase shell.
- * 
- * Use the following commands:
- * <ul>
- *     <li>create 'test-table', 'someCf'</li>
- *     <li>put 'test-table', '1', 'someCf:someQual', 'someString'</li>
- *     <li>put 'test-table', '2', 'someCf:someQual', 'anotherString'</li>
- * </ul>
- * 
- * The test should return just the first entry.
- * 
- */
-public class HBaseReadExample {
-	public static void main(String[] args) throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		@SuppressWarnings("serial")
-		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new TableInputFormat<Tuple2<String, String>>() {
-			private final byte[] CF_SOME = "someCf".getBytes();
-			private final byte[] Q_SOME = "someQual".getBytes();
-				@Override
-				public String getTableName() {
-					return "test-table";
-				}
-
-				@Override
-				protected Scan getScanner() {
-					Scan scan = new Scan();
-					scan.addColumn(CF_SOME, Q_SOME);
-					return scan;
-				}
-
-				private Tuple2<String, String> reuse = new Tuple2<String, String>();
-				
-				@Override
-				protected Tuple2<String, String> mapResultToTuple(Result r) {
-					String key = Bytes.toString(r.getRow());
-					String val = Bytes.toString(r.getValue(CF_SOME, Q_SOME));
-					reuse.setField(key, 0);
-					reuse.setField(val, 1);
-					return reuse;
-				}
-		})
-		.filter(new FilterFunction<Tuple2<String,String>>() {
-
-			@Override
-			public boolean filter(Tuple2<String, String> t) throws Exception {
-				String val = t.getField(1);
-				if(val.startsWith("someStr"))
-					return true;
-				return false;
-			}
-		});
-		
-		hbaseDs.print();
-		
-		// kick off execution.
-		env.execute();
-				
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/hbase-site.xml b/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
deleted file mode 100644
index 2984063..0000000
--- a/flink-addons/flink-hbase/src/test/resources/hbase-site.xml
+++ /dev/null
@@ -1,43 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-<!--
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
--->
-<configuration>
-
-  <property>
-    <name>hbase.tmp.dir</name>
-    <!-- 
-    <value>/media/Dati/hbase-0.98-data</value>
-    --> 
-    <value>/opt/hbase-0.98.6.1-hadoop2/data</value>
-
-  </property>
-  <property>
-    <name>hbase.zookeeper.quorum</name>
-    <value>localhost</value>
-  </property>
-    <!-- 
-  <property>
-    <name>hadoop.security.group.mapping</name>
-    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
-  </property>
-  -->
-</configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-hbase/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hbase/src/test/resources/log4j.properties b/flink-addons/flink-hbase/src/test/resources/log4j.properties
deleted file mode 100755
index d6eb2b2..0000000
--- a/flink-addons/flink-hbase/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-# http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-log4j.rootLogger=${hadoop.root.logger}
-hadoop.root.logger=INFO,console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/pom.xml b/flink-addons/flink-jdbc/pom.xml
deleted file mode 100644
index ec172a9..0000000
--- a/flink-addons/flink-jdbc/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-addons</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-jdbc</artifactId>
-	<name>flink-jdbc</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-				
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.derby</groupId>
-			<artifactId>derby</artifactId>
-			<version>10.10.1.1</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-</project>


Mime
View raw message