flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-7124] [flip6] Add test to verify rescaling JobGraphs works correctly
Date Sat, 10 Feb 2018 22:49:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master 91eea376e -> 556ea8a71


[FLINK-7124] [flip6] Add test to verify rescaling JobGraphs works correctly

This commit adds two tests to verify behaviours of rescaling JobGraphs:
1. JobGraphs can be consecutively rescaled to arbitrary valid DOPs
2. Rescaling beyond max parallelism would fail

The second test, however, is temporarily disabled for now since it
doesn't properly fail.

This closes #4510.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fe31cbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fe31cbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fe31cbf

Branch: refs/heads/master
Commit: 5fe31cbf26f35ae6921d543d362d2d9f006f45db
Parents: 91eea37
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Authored: Thu Aug 10 13:41:40 2017 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sat Feb 10 23:43:48 2018 +0100

----------------------------------------------------------------------
 .../ExecutionGraphRescalingTest.java            | 235 +++++++++++++++++++
 .../executiongraph/ExecutionGraphTestUtils.java |  77 ++++++
 2 files changed, 312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fe31cbf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
new file mode 100644
index 0000000..e6cc908
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class contains tests that verify when rescaling a {@link JobGraph},
+ * constructed {@link ExecutionGraph}s are correct.
+ */
+public class ExecutionGraphRescalingTest {
+
+	private static final Logger TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphRescalingTest.class);
+
+	@Test
+	public void testExecutionGraphArbitraryDopConstructionTest() throws Exception {
+
+		final Configuration config = new Configuration();
+
+		final JobVertex[] jobVertices = createVerticesForSimpleBipartiteJobGraph();
+		final JobGraph jobGraph = new JobGraph(jobVertices);
+
+		// TODO rescaling the JobGraph is currently only supported if the
+		// TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX.
+		// TODO this limitation should be removed.
+		for (JobVertex jv : jobVertices) {
+			jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+		}
+
+		ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				config,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				new Scheduler(TestingUtils.defaultExecutionContext()),
+				Thread.currentThread().getContextClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy(),
+				new UnregisteredMetricsGroup(),
+				5,
+				TEST_LOGGER);
+
+		for (JobVertex jv : jobVertices) {
+			assertEquals(5, jv.getParallelism());
+		}
+		verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, jobVertices);
+
+		// --- verify scaling up works correctly ---
+
+		// TODO rescaling the JobGraph is currently only supported if the
+		// TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX.
+		// TODO this limitation should be removed.
+		for (JobVertex jv : jobVertices) {
+			jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+		}
+
+		eg = ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				config,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				new Scheduler(TestingUtils.defaultExecutionContext()),
+				Thread.currentThread().getContextClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy(),
+				new UnregisteredMetricsGroup(),
+				10,
+				TEST_LOGGER);
+
+		for (JobVertex jv : jobVertices) {
+			assertEquals(10, jv.getParallelism());
+		}
+		verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, jobVertices);
+
+		// --- verify down scaling works correctly ---
+
+		// TODO rescaling the JobGraph is currently only supported if the
+		// TODO configured parallelism is ExecutionConfig.PARALLELISM_AUTO_MAX.
+		// TODO this limitation should be removed.
+		for (JobVertex jv : jobVertices) {
+			jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+		}
+
+		eg = ExecutionGraphBuilder.buildGraph(
+			null,
+			jobGraph,
+			config,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
+			Thread.currentThread().getContextClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			2,
+			TEST_LOGGER);
+
+		for (JobVertex jv : jobVertices) {
+			assertEquals(2, jv.getParallelism());
+		}
+		verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(eg, jobVertices);
+	}
+
+	/**
+	 * Verifies that building an {@link ExecutionGraph} from a {@link JobGraph} with
+	 * parallelism higher than the maximum parallelism fails.
+	 *
+	 * TODO this test is ignored, since currently the rescale does not properly fail when rescaling
to DOP above max.
+	 */
+	@Ignore
+	@Test
+	public void testExecutionGraphConstructionFailsRescaleDopExceedMaxParallelism() throws Exception
{
+
+		final Configuration config = new Configuration();
+
+		final JobVertex[] jobVertices = createVerticesForSimpleBipartiteJobGraph();
+		final JobGraph jobGraph = new JobGraph(jobVertices);
+
+		for (JobVertex jv : jobVertices) {
+			jv.setParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+			jv.setMaxParallelism(5);
+		}
+
+		try {
+			ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				config,
+				TestingUtils.defaultExecutor(),
+				TestingUtils.defaultExecutor(),
+				new Scheduler(TestingUtils.defaultExecutionContext()),
+				Thread.currentThread().getContextClassLoader(),
+				new StandaloneCheckpointRecoveryFactory(),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy(),
+				new UnregisteredMetricsGroup(),
+				10, // this should fail because 10 is larger than the max parallelism 5
+				TEST_LOGGER);
+
+			fail("Building the ExecutionGraph with a parallelism higher than the max parallelism should
fail.");
+		} catch (Exception e) {
+			// expected, ignore
+		}
+	}
+
+	private static JobVertex[] createVerticesForSimpleBipartiteJobGraph() {
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		JobVertex v4 = new JobVertex("vertex4");
+		JobVertex v5 = new JobVertex("vertex5");
+
+		JobVertex[] jobVertices = new JobVertex[]{v1, v2, v3, v4, v5};
+
+		for (JobVertex jobVertex : jobVertices) {
+			jobVertex.setInvokableClass(AbstractInvokable.class);
+		}
+
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+		v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		return jobVertices;
+	}
+
+	private static void verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(
+			ExecutionGraph generatedExecutionGraph, JobVertex[] jobVertices) {
+
+		ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
+				generatedExecutionGraph, jobVertices[0],
+				null, Collections.singletonList(jobVertices[1]));
+
+		ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
+				generatedExecutionGraph, jobVertices[1],
+				Collections.singletonList(jobVertices[0]), Collections.singletonList(jobVertices[3]));
+
+		ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
+				generatedExecutionGraph, jobVertices[2],
+				null, Arrays.asList(jobVertices[3], jobVertices[4]));
+
+		ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
+				generatedExecutionGraph, jobVertices[3],
+				Arrays.asList(jobVertices[1], jobVertices[2]), Collections.singletonList(jobVertices[4]));
+
+		ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(
+				generatedExecutionGraph, jobVertices[4],
+				Arrays.asList(jobVertices[3], jobVertices[2]), null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fe31cbf/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b1ee3cc..044cb52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -64,8 +64,10 @@ import akka.actor.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
@@ -74,6 +76,8 @@ import scala.concurrent.ExecutionContext$;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -476,4 +480,77 @@ public class ExecutionGraphTestUtils {
 	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {
 		return getExecutionVertex(id, TestingUtils.defaultExecutor());
 	}
+
+	// ------------------------------------------------------------------------
+	//  graph vertex verifications
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Verifies the generated {@link ExecutionJobVertex} for a given {@link JobVertex} in a
{@link ExecutionGraph}
+	 *
+	 * @param executionGraph the generated execution graph
+	 * @param originJobVertex the vertex to verify for
+	 * @param inputJobVertices upstream vertices of the verified vertex, used to check inputs
of generated vertex
+	 * @param outputJobVertices downstream vertices of the verified vertex, used to
+	 *                          check produced data sets of generated vertex
+	 */
+	public static void verifyGeneratedExecutionJobVertex(
+			ExecutionGraph executionGraph,
+			JobVertex originJobVertex,
+			@Nullable List<JobVertex> inputJobVertices,
+			@Nullable List<JobVertex> outputJobVertices) {
+
+		ExecutionJobVertex ejv = executionGraph.getAllVertices().get(originJobVertex.getID());
+		assertNotNull(ejv);
+
+		// verify basic properties
+		assertEquals(originJobVertex.getParallelism(), ejv.getParallelism());
+		assertEquals(executionGraph.getJobID(), ejv.getJobId());
+		assertEquals(originJobVertex.getID(), ejv.getJobVertexId());
+		assertEquals(originJobVertex, ejv.getJobVertex());
+
+		// verify produced data sets
+		if (outputJobVertices == null) {
+			assertEquals(0, ejv.getProducedDataSets().length);
+		} else {
+			assertEquals(outputJobVertices.size(), ejv.getProducedDataSets().length);
+			for (int i = 0; i < outputJobVertices.size(); i++) {
+				assertEquals(originJobVertex.getProducedDataSets().get(i).getId(), ejv.getProducedDataSets()[i].getId());
+				assertEquals(originJobVertex.getParallelism(), ejv.getProducedDataSets()[0].getPartitions().length);
+			}
+		}
+
+		// verify task vertices for their basic properties and their inputs
+		assertEquals(originJobVertex.getParallelism(), ejv.getTaskVertices().length);
+
+		int subtaskIndex = 0;
+		for (ExecutionVertex ev : ejv.getTaskVertices()) {
+			assertEquals(executionGraph.getJobID(), ev.getJobId());
+			assertEquals(originJobVertex.getID(), ev.getJobvertexId());
+
+			assertEquals(originJobVertex.getParallelism(), ev.getTotalNumberOfParallelSubtasks());
+			assertEquals(subtaskIndex, ev.getParallelSubtaskIndex());
+
+			if (inputJobVertices == null) {
+				assertEquals(0, ev.getNumberOfInputs());
+			} else {
+				assertEquals(inputJobVertices.size(), ev.getNumberOfInputs());
+
+				for (int i = 0; i < inputJobVertices.size(); i++) {
+					ExecutionEdge[] inputEdges = ev.getInputEdges(i);
+					assertEquals(inputJobVertices.get(i).getParallelism(), inputEdges.length);
+
+					int expectedPartitionNum = 0;
+					for (ExecutionEdge inEdge : inputEdges) {
+						assertEquals(i, inEdge.getInputNum());
+						assertEquals(expectedPartitionNum, inEdge.getSource().getPartitionNumber());
+
+						expectedPartitionNum++;
+					}
+				}
+			}
+
+			subtaskIndex++;
+		}
+	}
 }


Mime
View raw message