flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: Add autoparallelism to jobs
Date Wed, 04 Mar 2015 17:37:27 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 941712941 -> a6f9f9939


Add autoparallelism to jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ff3f4c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ff3f4c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ff3f4c4

Branch: refs/heads/release-0.8
Commit: 4ff3f4c43594610090c2c17220bd522df6ec1c7c
Parents: 9417129
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Mar 4 14:21:54 2015 +0100

----------------------------------------------------------------------
 .../client/program/AutoParallelismITCase.java   | 118 +++++++++++++++++++
 .../flink/api/common/ExecutionConfig.java       |   6 +
 .../flink/runtime/jobmanager/JobManager.java    |   7 ++
 3 files changed, 131 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
new file mode 100644
index 0000000..c1fa888
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+public class AutoParallelismITCase {
+
+	private static final int NUM_TM = 2;
+	private static final int SLOTS_PER_TM = 7;
+	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+	@Test
+	public void testProgramWithAutoParallelism() {
+
+		NepheleMiniCluster cluster = new NepheleMiniCluster();
+		cluster.setNumTaskManager(NUM_TM);
+		cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
+
+		try {
+			cluster.start();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
+			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+			DataSet<Integer> result = env
+					.createInput(new ParallelismDependentInputFormat())
+					.mapPartition(new ParallelismDependentMapPartition());
+
+			List<Integer> resultCollection = new ArrayList<Integer>();
+			result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+			env.execute();
+
+			assertEquals(PARALLELISM, resultCollection.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			try {
+				cluster.stop();
+			}
+			catch (Throwable t) {
+				// ignore exceptions on shutdown
+			}
+		}
+	}
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer>
{
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(PARALLELISM, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+
+	private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer,
Integer> {
+
+		@Override
+		public void mapPartition(Iterable<Integer> values, Collector<Integer> out)
{
+			out.collect(getRuntimeContext().getIndexOfThisSubtask());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 03d5e3a..8216b25 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -31,6 +31,12 @@ public class ExecutionConfig implements Serializable {
 	// Key for storing it in the Job Configuration
 	public static final String CONFIG_KEY = "runtime.config";
 
+	/**
+	 * The constant to use for the degree of parallelism, if the system should use the number
+	 *  of currently available slots.
+	 */
+	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
 	private boolean useClosureCleaner = true;
 	private int degreeOfParallelism = -1;
 	private int numberOfExecutionRetries = -1;

http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 4f61d94..223c6c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -40,6 +40,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -374,6 +375,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 				LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(),
job.getName()));
 			}
 
+			final int numSlots = scheduler.getTotalNumberOfSlots();
+
 			for (AbstractJobVertex vertex : job.getVertices()) {
 				// check that the vertex has an executable class
 				String executableClass = vertex.getInvokableClassName();
@@ -383,6 +386,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 
 				// master side initialization
 				vertex.initializeOnMaster(userCodeLoader);
+
+				if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+					vertex.setParallelism(numSlots);
+				}
 			}
 
 			// first topologically sort the job vertices to form the basis of creating the execution
graph


Mime
View raw message