flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/2] flink git commit: [FLINK-8972] [e2eTests] Add DataSetAllroundTestProgram and e2e test script.
Date Mon, 26 Mar 2018 16:06:50 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.5 d721e20bf -> 69d98a325


[FLINK-8972] [e2eTests] Add DataSetAllroundTestProgram and e2e test script.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba73f707
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba73f707
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba73f707

Branch: refs/heads/release-1.5
Commit: ba73f707c3f07209fcdb0318d6e1424e2eb48da8
Parents: d721e20
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Mar 21 16:21:46 2018 +0100
Committer: Timo Walther <twalthr@apache.org>
Committed: Mon Mar 26 18:01:13 2018 +0200

----------------------------------------------------------------------
 .../flink-dataset-allround-test/pom.xml         | 104 +++++++
 .../batch/tests/DataSetAllroundTestProgram.java | 285 +++++++++++++++++++
 flink-end-to-end-tests/pom.xml                  |   1 +
 flink-end-to-end-tests/run-nightly-tests.sh     |   8 +
 .../test-scripts/test_batch_allround.sh         |  36 +++
 5 files changed, 434 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba73f707/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
new file mode 100644
index 0000000..b701dfd
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<version>1.6-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-dataset-allround-test</artifactId>
+	<name>flink-dataset-allround-test</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<!-- DataSetAllroundTestProgram -->
+					<execution>
+						<id>DataSetAllroundTestProgram</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DataSetAllroundTestProgram</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.batch.tests.DataSetAllroundTestProgram</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/batch/tests/DataSetAllroundTestProgram.class</include>
+								<include>org/apache/flink/batch/tests/DataSetAllroundTestProgram$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of the testing JARs for referring to them in the end-to-end test
scripts-->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-dataset-allround-test-${project.version}-DataSetAllroundTestProgram.jar"
tofile="${project.basedir}/target/DataSetAllroundTestProgram.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ba73f707/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
new file mode 100644
index 0000000..0397535
--- /dev/null
+++ b/flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Program to test a large chunk of DataSet API operators and primitives:
+ * <ul>
+ *     <li>Map, FlatMap, Filter</li>
+ *     <li>GroupReduce, Reduce</li>
+ *     <li>Join</li>
+ *     <li>CoGroup</li>
+ *     <li>BulkIteration</li>
+ *     <li>Different key definitions (position, name, KeySelector)</li>
+ * </ul>
+ *
+ * <p>Program parameters:
+ * <ul>
+ *     <li>loadFactor (int): controls generated data volume. Does not affect result.</li>
+ *     <li>outputPath (String): path to write the result</li>
+ * </ul>
+ */
+public class DataSetAllroundTestProgram {
+
+	public static void main(String[] args) throws Exception {
+
+		// get parameters
+		ParameterTool params = ParameterTool.fromArgs(args);
+		int loadFactor = Integer.parseInt(params.getRequired("loadFactor"));
+		String outputPath = params.getRequired("outputPath");
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		int numKeys = loadFactor * 128 * 1024;
+		DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys,
1)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys
* 32, 2)).setParallelism(4);
+		DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys,
8)).setParallelism(4);
+
+		DataSet<Tuple2<String, Integer>> joined = x2Keys
+			// shift keys (check for correct handling of key positions)
+			.map(x -> Tuple4.of("0-0", 0L, 1, x.f0))
+				.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT, Types.STRING))
+			// join datasets on non-unique fields (m-n join)
+			// Result: (key, 1) 16 * #keys records, all keys are preserved
+			.join(x8Keys).where(3).equalTo(0).with((l, r) -> Tuple2.of(l.f3, 1))
+				.returns(Types.TUPLE(Types.STRING, Types.INT))
+			// key definition with key selector function
+			.groupBy(
+				new KeySelector<Tuple2<String, Integer>, String>() {
+					@Override
+					public String getKey(Tuple2<String, Integer> value) throws Exception {
+						return value.f0;
+					}
+				}
+			)
+			// reduce
+			// Result: (key, cnt), #keys records with unique keys, cnt = 16
+			.reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1));
+
+		// co-group two datasets on their primary keys.
+		// we filter both inputs such that only 6.25% of the keys overlap.
+		// result: (key, cnt), #keys records with unique keys, cnt = (6.25%: 2, 93.75%: 1)
+		DataSet<Tuple2<String, Integer>> coGrouped = x1Keys
+			.filter(x -> x.f1 > 59)
+			.coGroup(x1Keys.filter(x -> x.f1 < 68)).where("f0").equalTo("f0").with(
+				(CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String,
Integer>>)
+				(l, r, out) -> {
+					int cnt = 0;
+					String key = "";
+					for (Tuple2<String, Integer> t : l) {
+						cnt++;
+						key = t.f0;
+					}
+					for (Tuple2<String, Integer> t : r) {
+						cnt++;
+						key = t.f0;
+					}
+					out.collect(Tuple2.of(key, cnt));
+				}
+			)
+				.returns(Types.TUPLE(Types.STRING, Types.INT));
+
+		// join datasets on keys (1-1 join) and replicate by 16 (previously computed count)
+		// result: (key, cnt), 16 * #keys records, all keys preserved, cnt = (6.25%: 2, 93.75%:
1)
+		DataSet<Tuple2<String, Integer>> joined2 = joined.join(coGrouped, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE)
+				.where(0).equalTo("f0")
+			.flatMap(
+				(FlatMapFunction<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>,
Tuple2<String, Integer>>)
+				(p, out) -> {
+					for (int i = 0; i < p.f0.f1; i++) {
+						out.collect(Tuple2.of(p.f0.f0, p.f1.f1));
+					}
+				}
+			)
+				.returns(Types.TUPLE(Types.STRING, Types.INT));
+
+		// iteration. double the count field until all counts are at 32 or more
+		// result: (key, cnt), 16 * #keys records, all keys preserved, cnt = (6.25%: 64, 93.75%:
32)
+		IterativeDataSet<Tuple2<String, Integer>> initial = joined2.iterate(16);
+		DataSet<Tuple2<String, Integer>> iteration = initial
+			.map(x -> Tuple2.of(x.f0, x.f1 * 2))
+				.returns(Types.TUPLE(Types.STRING, Types.INT));
+		DataSet<Boolean> termination = iteration
+			// stop iteration if all values are larger/equal 32
+			.flatMap(
+				(FlatMapFunction<Tuple2<String, Integer>, Boolean>)
+				(x, out) -> {
+					if (x.f1 < 32) {
+						out.collect(false);
+					}
+				}
+			)
+				.returns(Types.BOOLEAN);
+		DataSet result = initial.closeWith(iteration, termination)
+			// group on the count field and count records
+			// result: two records: (32, cnt1) and (64, cnt2) where cnt1 = x * 15/16, cnt2 = x * 1/16
+			.groupBy(1)
+			.reduceGroup(
+				(GroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>>)
+				(g, out) -> {
+					int key = 0;
+					int cnt = 0;
+					for (Tuple2<String, Integer> r : g)  {
+						key = r.f1;
+						cnt++;
+					}
+					out.collect(Tuple2.of(key, cnt));
+				}
+			)
+				.returns(Types.TUPLE(Types.INT, Types.INT))
+			// normalize result by load factor
+			// result: two records: (32: 15360) and (64, 1024). (x = 16384)
+			.map(x -> Tuple2.of(x.f0, x.f1 / (loadFactor * 128)))
+				.returns(Types.TUPLE(Types.INT, Types.INT));
+
+		// sort and emit result
+		result
+			.sortPartition(0, Order.ASCENDING).setParallelism(1)
+			.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+
+		env.execute();
+
+	}
+
+	/**
+	 * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer)
+	 * <ul>
+	 *     <li>String: key, can be repeated.</li>
+	 *     <li>Integer: uniformly distributed int between 0 and 127</li>
+	 * </ul>
+	 */
+	public static class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit>
{
+
+		// total number of records
+		private final long numRecords;
+		// total number of keys
+		private final long numKeys;
+
+		// records emitted per partition
+		private long recordsPerPartition;
+		// number of keys per partition
+		private long keysPerPartition;
+
+		// number of currently emitted records
+		private long recordCnt;
+
+		// id of current partition
+		private int partitionId;
+		// total number of partitions
+		private int numPartitions;
+
+		public Generator(long numKeys, int recordsPerKey) {
+			this.numKeys = numKeys;
+			this.numRecords = numKeys * recordsPerKey;
+		}
+
+		@Override
+		public void configure(Configuration parameters) { }
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException
{
+			return null;
+		}
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+
+			GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
+			for (int i = 0; i < minNumSplits; i++) {
+				splits[i] = new GenericInputSplit(i, minNumSplits);
+			}
+			return splits;
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) {
+			return new DefaultInputSplitAssigner(inputSplits);
+		}
+
+		@Override
+		public void open(GenericInputSplit split) throws IOException {
+			this.partitionId = split.getSplitNumber();
+			this.numPartitions = split.getTotalNumberOfSplits();
+
+			// ensure even distribution of records and keys
+			Preconditions.checkArgument(
+				numRecords % numPartitions == 0,
+				"Records cannot be evenly distributed among partitions");
+			Preconditions.checkArgument(
+				numKeys % numPartitions == 0,
+				"Keys cannot be evenly distributed among partitions");
+
+			this.recordsPerPartition = numRecords / numPartitions;
+			this.keysPerPartition = numKeys / numPartitions;
+
+			this.recordCnt = 0;
+		}
+
+		@Override
+		public boolean reachedEnd() throws IOException {
+			return this.recordCnt >= this.recordsPerPartition;
+		}
+
+		@Override
+		public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws
IOException {
+
+			// build key from partition id and count per partition
+			String key = String.format(
+				"%d-%d",
+				this.partitionId,
+				this.recordCnt % this.keysPerPartition);
+			// 128 values to filter on
+			int filterVal = (int) this.recordCnt % 128;
+
+			this.recordCnt++;
+
+			reuse.f0 = key;
+			reuse.f1 = filterVal;
+			return reuse;
+		}
+
+		@Override
+		public void close() throws IOException { }
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba73f707/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 695a4f1..db86cc1 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -36,6 +36,7 @@ under the License.
 
 	<modules>
 		<module>flink-parent-child-classloading-test</module>
+		<module>flink-dataset-allround-test</module>
 	</modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ba73f707/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index 8ee526b..71224e0 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -47,5 +47,13 @@ EXIT_CODE=0
 #     EXIT_CODE=$?
 # fi
 
+if [ $EXIT_CODE == 0 ]; then
+  printf "\n==============================================================================\n"
+  printf "Running DataSet allround nightly end-to-end test\n"
+  printf "==============================================================================\n"
+  $END_TO_END_DIR/test-scripts/test_batch_allround.sh
+  EXIT_CODE=$?
+fi
+
 # Exit code for Travis build success/failure
 exit $EXIT_CODE

http://git-wip-us.apache.org/repos/asf/flink/blob/ba73f707/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
new file mode 100755
index 0000000..1e05c7f
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+    echo "Run DataSet-Allround-Test Program"
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 2 --outputPath $TEST_DATA_DIR/out/dataset_allround
+
+stop_cluster
+$FLINK_DIR/bin/taskmanager.sh stop-all
+
+check_result_hash "DataSet-Allround-Test" $TEST_DATA_DIR/out/dataset_allround "d3cf2aeaa9320c772304cba42649eb47"


Mime
View raw message