flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1648] Add auto-parallelism to select all available task slots
Date Wed, 04 Mar 2015 19:44:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9c0d6347e -> d8d642fd6


[FLINK-1648] Add auto-parallelism to select all available task slots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8d642fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8d642fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8d642fd

Branch: refs/heads/master
Commit: d8d642fd6d7d9b8526325d4efff1015f636c5ddb
Parents: 55f1508
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Mar 4 20:44:13 2015 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       |   6 +
 .../base/CollectorMapOperatorBase.java          |   1 +
 .../flink/runtime/jobmanager/JobManager.scala   |   9 ++
 .../flink/test/misc/AutoParallelismITCase.java  | 143 +++++++++++++++++++
 .../test/recovery/SimpleRecoveryITCase.java     |   2 +-
 5 files changed, 160 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 81ce471..d315440 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -35,6 +35,12 @@ public class ExecutionConfig implements Serializable {
 	// Key for storing it in the Job Configuration
 	public static final String CONFIG_KEY = "runtime.config";
 
+	/**
+	 * The constant to use for the degree of parallelism, if the system should use the number
+	 *  of currently available slots.
+	 */
+	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
 	private boolean useClosureCleaner = true;
 	private int degreeOfParallelism = -1;
 	private int numberOfExecutionRetries = -1;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
index 62bdfbe..b7ff2ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.operators.util.UserCodeWrapper;
  * @see GenericCollectorMap
  */
 @Deprecated
+@SuppressWarnings("deprecation")
 public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>>
extends SingleInputOperator<IN, OUT, FT> {
 	
 	public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN,
OUT> operatorInfo, String name) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 54d3cf2..e3e96e5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, File}
 import java.net.InetSocketAddress
 
 import akka.actor.Status.{Success, Failure}
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -477,12 +478,20 @@ class JobManager(val configuration: Configuration,
           log.debug(s"Running initialization on master for job ${jobId} (${jobName}).")
         }
 
+        val numSlots = scheduler.getTotalNumberOfSlots()
+
         for (vertex <- jobGraph.getVertices.asScala) {
+
           val executableClass = vertex.getInvokableClassName
           if (executableClass == null || executableClass.length == 0) {
             throw new JobSubmissionException(jobId,
               s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
           }
+
+          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+            vertex.setParallelism(numSlots)
+          }
+
           try {
             vertex.initializeOnMaster(userCodeLoader)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
new file mode 100644
index 0000000..ea79a3a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.Collector;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+@SuppressWarnings("serial")
+public class AutoParallelismITCase {
+
+	private static final int NUM_TM = 2;
+	private static final int SLOTS_PER_TM = 7;
+	private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+	private static ForkableFlinkMiniCluster cluster;
+
+	@BeforeClass
+	public static void setupCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUM_TM);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TM);
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+
+		cluster = new ForkableFlinkMiniCluster(config, false);
+	}
+
+	@AfterClass
+	public static void teardownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			System.err.println("Error stopping cluster on shutdown");
+			t.printStackTrace();
+			fail("Cluster shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+
+	@Test
+	public void testProgramWithAutoParallelism() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getJobManagerRPCPort());
+
+			env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+			DataSet<Integer> result = env
+					.createInput(new ParallelismDependentInputFormat())
+					.rebalance()
+					.mapPartition(new ParallelismDependentMapPartition());
+
+			List<Integer> resultCollection = new ArrayList<Integer>();
+			result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+			env.execute();
+
+			assertEquals(PARALLELISM, resultCollection.size());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			try {
+				cluster.stop();
+			}
+			catch (Throwable t) {
+				// ignore exceptions on shutdown
+			}
+		}
+	}
+
+	private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer>
{
+
+		private transient boolean emitted;
+
+		@Override
+		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+			assertEquals(PARALLELISM, numSplits);
+			return super.createInputSplits(numSplits);
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return emitted;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			if (emitted) {
+				return null;
+			}
+			emitted = true;
+			return 1;
+		}
+	}
+
+	private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer,
Integer> {
+
+		@Override
+		public void mapPartition(Iterable<Integer> values, Collector<Integer> out)
{
+			out.collect(getRuntimeContext().getIndexOfThisSubtask());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8d642fd/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 1591d67..8330109 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -37,9 +37,9 @@ import java.util.List;
 
 import static org.junit.Assert.*;
 
+@SuppressWarnings("serial")
 public class SimpleRecoveryITCase {
 
-
 	private static ForkableFlinkMiniCluster cluster;
 
 	@BeforeClass


Mime
View raw message