flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-6058] Don't read DEFAULT_PARALLELISM from GlobalConfiguration
Date Tue, 11 Jul 2017 09:27:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master d0e52954b -> 4aa2ffcef


[FLINK-6058] Don't read DEFAULT_PARALLELISM from GlobalConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4aa2ffce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4aa2ffce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4aa2ffce

Branch: refs/heads/master
Commit: 4aa2ffcef8edae574ec270631841ef4a0c793dec
Parents: d0e5295
Author: zjureel <zjureel@gmail.com>
Authored: Thu Jun 22 17:47:33 2017 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Jul 11 11:22:43 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/client/CliFrontend.java   | 11 +++++++++++
 .../java/org/apache/flink/client/CliFrontendRunTest.java |  2 +-
 .../api/environment/StreamContextEnvironment.java        |  6 ------
 3 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4aa2ffce/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index cbb7aaa..f89b016 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -143,6 +144,8 @@ public class CliFrontend {
 
 	private final FiniteDuration clientTimeout;
 
+	private final int defaultParallelism;
+
 	/**
 	 *
 	 * @throws Exception Thrown if the configuration directory was not found, the configuration
could not be loaded
@@ -169,6 +172,9 @@ public class CliFrontend {
 		}
 
 		this.clientTimeout = AkkaUtils.getClientTimeout(config);
+		this.defaultParallelism = GlobalConfiguration.loadConfiguration().getInteger(
+														ConfigConstants.DEFAULT_PARALLELISM_KEY,
+														ConfigConstants.DEFAULT_PARALLELISM);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -250,6 +256,8 @@ public class CliFrontend {
 					+ client.getMaxSlots() + "). "
 					+ "To use another parallelism, set it at the ./bin/flink client.");
 				userParallelism = client.getMaxSlots();
+			} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
+				userParallelism = defaultParallelism;
 			}
 
 			return executeProgram(program, client, userParallelism);
@@ -314,6 +322,9 @@ public class CliFrontend {
 
 		try {
 			int parallelism = options.getParallelism();
+			if (ExecutionConfig.PARALLELISM_DEFAULT == parallelism) {
+				parallelism = defaultParallelism;
+			}
 
 			LOG.info("Creating program plan dump");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4aa2ffce/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 43116e4..e453d37 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -58,7 +58,7 @@ public class CliFrontendRunTest {
 			// test without parallelism
 			{
 				String[] parameters = {"-v", getTestJarPath()};
-				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, true, false);
 				assertEquals(0, testFrontend.run(parameters));
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4aa2ffce/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 64c3a1f..010628f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -21,8 +21,6 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.DetachedEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -45,10 +43,6 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment
{
 		this.ctx = ctx;
 		if (ctx.getParallelism() > 0) {
 			setParallelism(ctx.getParallelism());
-		} else {
-			setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-					ConfigConstants.DEFAULT_PARALLELISM_KEY,
-					ConfigConstants.DEFAULT_PARALLELISM));
 		}
 	}
 


Mime
View raw message