flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/6] Revert "Added wrappers for Hadoop functions"
Date Wed, 08 Oct 2014 10:23:41 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 74dded1c2 -> e2fd615ce


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/53d8f1f0/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
deleted file mode 100644
index 605bf21..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class HadoopReduceFunctionITCase extends JavaProgramTestBase {
-
-	private static int NUM_PROGRAMS = 3;
-	
-	private int curProgId = config.getInteger("ProgramId", -1);
-	private String resultPath;
-	private String expectedResult;
-	
-	public HadoopReduceFunctionITCase(Configuration config) {
-		super(config);	
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		expectedResult = ReducerProgs.runProgram(curProgId, resultPath);
-	}
-	
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-	
-	@Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException,
IOException {
-
-		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
-		for(int i=1; i <= NUM_PROGRAMS; i++) {
-			Configuration config = new Configuration();
-			config.setInteger("ProgramId", i);
-			tConfigs.add(config);
-		}
-		
-		return toParameterList(tConfigs);
-	}
-	
-	public static class ReducerProgs {
-		
-		public static String runProgram(int progId, String resultPath) throws Exception {
-			
-			switch(progId) {
-			case 1: {
-				/*
-				 * Test standard grouping
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
-						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>()
{
-							private static final long serialVersionUID = 1L;
-							@Override
-							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
-									throws Exception {
-								v.f0 = new IntWritable(v.f0.get() / 5);
-								return v;
-							}
-						});
-						
-				DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-						groupBy(0).
-						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new
CommentCntReducer()));
-				
-				commentCnts.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"(0,0)\n"+
-						"(1,3)\n" +
-						"(2,5)\n" +
-						"(3,5)\n" +
-						"(4,2)\n";
-			}
-			case 2: {
-				/*
-				 * Test ungrouped Hadoop reducer
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env);
-						
-				DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds.
-						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new
AllCommentCntReducer()));
-				
-				commentCnts.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"(42,15)\n";
-			}
-			case 3: {
-				/*
-				 * Test configuration via JobConf
-				 */
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-				
-				JobConf conf = new JobConf();
-				conf.set("my.cntPrefix", "Hello");
-				
-				DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env).
-						map(new MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>>()
{
-							private static final long serialVersionUID = 1L;
-							@Override
-							public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v)
-									throws Exception {
-								v.f0 = new IntWritable(v.f0.get() % 5);
-								return v;
-							}
-						});
-						
-				DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds.
-						groupBy(0).
-						reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(
-								new ConfigurableCntReducer(), conf));
-				
-				helloCnts.writeAsText(resultPath);
-				env.execute();
-				
-				// return expected result
-				return 	"(0,0)\n"+
-						"(1,0)\n" +
-						"(2,1)\n" +
-						"(3,1)\n" +
-						"(4,1)\n";
-			}
-			default: 
-				throw new IllegalArgumentException("Invalid program id");
-			}
-			
-		}
-	
-	}
-	
-	public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable,
IntWritable> {
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable,
IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
-					commentCnt++;
-				}
-			}
-			out.collect(k, new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable,
IntWritable> {
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable,
IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith("Comment")) {
-					commentCnt++;
-				}
-			}
-			out.collect(new IntWritable(42), new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf arg0) { }
-
-		@Override
-		public void close() throws IOException { }
-	}
-	
-	public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable,
IntWritable> {
-		private String countPrefix;
-		
-		@Override
-		public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable,
IntWritable> out, Reporter r)
-				throws IOException {
-			int commentCnt = 0;
-			while(vs.hasNext()) {
-				String v = vs.next().toString();
-				if(v.startsWith(this.countPrefix)) {
-					commentCnt++;
-				}
-			}
-			out.collect(k, new IntWritable(commentCnt));
-		}
-		
-		@Override
-		public void configure(final JobConf c) { 
-			this.countPrefix = c.get("my.cntPrefix");
-		}
-
-		@Override
-		public void close() throws IOException { }
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/53d8f1f0/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
deleted file mode 100644
index eed6f8f..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-
-public class HadoopTestData {
-
-	public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment
env) {
-		
-		List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable,
Text>>();
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world,
how are you?")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine.")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14")));
-		data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15")));
-		
-		Collections.shuffle(data);
-		
-		return env.fromCollection(data);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/53d8f1f0/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
deleted file mode 100644
index 2592b88..0000000
--- a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.hadoopcompatibility.mapred.wrapper;
-
-import java.util.ArrayList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.hadoop.io.IntWritable;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class HadoopTupleUnwrappingIteratorTest {
-
-	@Test
-	public void testValueIterator() {
-		
-		HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = 
-				new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(IntWritable.class);
-		
-		// many values
-		
-		ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable,
IntWritable>>();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8)));
-		
-		int expectedKey = 1;
-		int[] expectedValues = new int[] {1,2,3,4,5,6,7,8};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// one value
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10)));
-		
-		expectedKey = 2;
-		expectedValues = new int[]{10};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// more values
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21)));
-		
-		expectedKey = 3;
-		expectedValues = new int[]{10,4,7,9,21};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.hasNext());
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-			Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		
-		// no has next calls
-		
-		tList.clear();
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1)));
-		tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0)));
-		
-		expectedKey = 4;
-		expectedValues = new int[]{5,8,42,-1,0};
-		
-		valIt.set(tList.iterator());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-		for(int expectedValue : expectedValues) {
-			Assert.assertTrue(valIt.next().get() == expectedValue);
-		}
-		try {
-			valIt.next();
-			Assert.fail();
-		} catch (NoSuchElementException nsee) {
-			// expected
-		}
-		Assert.assertFalse(valIt.hasNext());
-		Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/53d8f1f0/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index f75697b..61e10d4 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -87,9 +87,6 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	protected void postSubmit() throws Exception {}
 	
-	protected boolean skipCollectionExecution() {
-		return false;
-	};
 
 	// --------------------------------------------------------------------------------------------
 	//  Test entry point
@@ -144,12 +141,6 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	@Test
 	public void testJobCollectionExecution() throws Exception {
-		
-		// check if collection execution should be skipped.
-		if(this.skipCollectionExecution()) {
-			return;
-		}
-		
 		isCollectionExecution = true;
 		
 		// pre-submit


Mime
View raw message