flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [07/16] flink git commit: Remove Record API dependencies from WordCount compiler test
Date Tue, 24 Nov 2015 17:17:50 GMT
Remove Record API dependencies from WordCount compiler test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353ef49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353ef49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353ef49

Branch: refs/heads/master
Commit: d353ef49a99a5524ff87784f8a983eaf99e028d8
Parents: b640c01
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Oct 22 21:19:34 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Nov 24 18:16:51 2015 +0100

----------------------------------------------------------------------
 .../examples/WordCountCompilerTest.java         | 193 ++++++-------------
 1 file changed, 64 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d353ef49/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
index ce71383..576717f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/WordCountCompilerTest.java
@@ -22,16 +22,15 @@ import java.util.Arrays;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.distributions.SimpleDistribution;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.FileDataSourceBase;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -40,17 +39,13 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.CountWords;
-import org.apache.flink.test.recordJobs.wordcount.WordCount.TokenizeLine;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.StringValue;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class WordCountCompilerTest extends CompilerTestBase {
-	
+
+	private static final long serialVersionUID = 8988304231385358228L;
+
 	/**
 	 * This method tests the simple word count.
 	 */
@@ -61,124 +56,64 @@ public class WordCountCompilerTest extends CompilerTestBase {
 	}
 	
 	private void checkWordCount(boolean estimates) {
-		try {
-			WordCount wc = new WordCount();
-			ExecutionConfig ec = new ExecutionConfig();
-			Plan p = wc.getPlan(DEFAULT_PARALLELISM_STRING, IN_FILE, OUT_FILE);
-			p.setExecutionConfig(ec);
 
-			OptimizedPlan plan;
-			if (estimates) {
-				FileDataSource source = getContractResolver(p).getNode("Input Lines");
-				setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
-				plan = compileWithStats(p);
-			} else {
-				plan = compileNoStats(p);
-			}
-			
-			// get the optimizer plan nodes
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
-			SinkPlanNode sink = resolver.getNode("Word Counts");
-			SingleInputPlanNode reducer = resolver.getNode("Count Words");
-			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-			
-			// verify the strategies
-			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			
-			Channel c = reducer.getInput();
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
-			FieldList l = new FieldList(0);
-			Assert.assertEquals(l, c.getShipStrategyKeys());
-			Assert.assertEquals(l, c.getLocalStrategyKeys());
-			Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
-			
-			// check the combiner
-			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys(0));
-			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-			
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		// get input data
+		DataSet<String> lines = env.readTextFile(IN_FILE).name("Input Lines");
+
+		lines
+			// dummy map
+			.map(new MapFunction<String, Tuple2<String, Integer>>() {
+				private static final long serialVersionUID = -3952739820618875030L;
+				@Override
+				public Tuple2<String, Integer> map(String v) throws Exception {
+					return new Tuple2<>(v, 1);
+				}
+			}).name("Tokenize Lines")
+			// count
+				.groupBy(0).sum(1).name("Count Words")
+			// discard
+				.output(new DiscardingOutputFormat<Tuple2<String, Integer>>()).name("Word
Counts");
+
+		// get the plan and compile it
+		Plan p = env.createProgramPlan();
+		p.setExecutionConfig(new ExecutionConfig());
+
+		OptimizedPlan plan;
+		if (estimates) {
+			GenericDataSourceBase<?,?> source = getContractResolver(p).getNode("Input Lines");
+			setSourceStatistics(source, 1024*1024*1024*1024L, 24f);
+			plan = compileWithStats(p);
+		} else {
+			plan = compileNoStats(p);
 		}
-	}
-	
-	/**
-	 * This method tests that with word count and a range partitioned sink, the range partitioner
is pushed down.
-	 */
-	@Test
-	public void testWordCountWithSortedSink() {
-		checkWordCountWithSortedSink(true);
-		checkWordCountWithSortedSink(false);
-	}
-	
-	private void checkWordCountWithSortedSink(boolean estimates) {
-		try {
-			FileDataSource sourceNode = new FileDataSource(new TextInputFormat(), IN_FILE, "Input
Lines");
-			MapOperator mapNode = MapOperator.builder(new TokenizeLine())
-				.input(sourceNode)
-				.name("Tokenize Lines")
-				.build();
-			ReduceOperator reduceNode = ReduceOperator.builder(new CountWords(), StringValue.class,
0)
-				.input(mapNode)
-				.name("Count Words")
-				.build();
-			FileDataSink out = new FileDataSink(new CsvOutputFormat(), OUT_FILE, reduceNode, "Word
Counts");
-			CsvOutputFormat.configureRecordFormat(out)
-				.recordDelimiter('\n')
-				.fieldDelimiter(' ')
-				.lenient(true)
-				.field(StringValue.class, 0)
-				.field(IntValue.class, 1);
-			
-			Ordering ordering = new Ordering(0, StringValue.class, Order.DESCENDING);
-			out.setGlobalOrder(ordering, new SimpleDistribution(new StringValue[] {new StringValue("N")}));
 
-			ExecutionConfig ec = new ExecutionConfig();
-			Plan p = new Plan(out, "WordCount Example");
-			p.setDefaultParallelism(DEFAULT_PARALLELISM);
-			p.setExecutionConfig(ec);
-	
-			OptimizedPlan plan;
-			if (estimates) {
-				setSourceStatistics(sourceNode, 1024*1024*1024*1024L, 24f);
-				plan = compileWithStats(p);
-			} else {
-				plan = compileNoStats(p);
-			}
-			
-			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
-			SinkPlanNode sink = resolver.getNode("Word Counts");
-			SingleInputPlanNode reducer = resolver.getNode("Count Words");
-			SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
-			
-			Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.PARTITION_RANGE, reducer.getInput().getShipStrategy());
-			Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			
-			Channel c = reducer.getInput();
-			Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
-			FieldList l = new FieldList(0);
-			Assert.assertEquals(l, c.getShipStrategyKeys());
-			Assert.assertEquals(l, c.getLocalStrategyKeys());
-			
-			// check that the sort orders are descending
-			Assert.assertFalse(c.getShipStrategySortOrder()[0]);
-			Assert.assertFalse(c.getLocalStrategySortOrder()[0]);
+		// get the optimizer plan nodes
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(plan);
+		SinkPlanNode sink = resolver.getNode("Word Counts");
+		SingleInputPlanNode reducer = resolver.getNode("Count Words");
+		SingleInputPlanNode mapper = resolver.getNode("Tokenize Lines");
+
+		// verify the strategies
+		Assert.assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
+		Assert.assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+		Channel c = reducer.getInput();
+		Assert.assertEquals(LocalStrategy.COMBININGSORT, c.getLocalStrategy());
+		FieldList l = new FieldList(0);
+		Assert.assertEquals(l, c.getShipStrategyKeys());
+		Assert.assertEquals(l, c.getLocalStrategyKeys());
+		Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
+
+		// check the combiner
+		SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
+		Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
+		Assert.assertEquals(l, combiner.getKeys(0));
+		Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
 			
-			// check the combiner
-			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
-			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys(0));
-			Assert.assertEquals(l, combiner.getKeys(1));
-			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
 	}
 	
 }


Mime
View raw message