Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7655F17BE8 for ; Mon, 23 Mar 2015 08:09:34 +0000 (UTC) Received: (qmail 70523 invoked by uid 500); 23 Mar 2015 08:09:21 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 70447 invoked by uid 500); 23 Mar 2015 08:09:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 70262 invoked by uid 99); 23 Mar 2015 08:09:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Mar 2015 08:09:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 562A0E05E3; Mon, 23 Mar 2015 08:09:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Date: Mon, 23 Mar 2015 08:09:28 -0000 Message-Id: <7c83830407294a22a7838f8d36d0a503@git.apache.org> In-Reply-To: <2a1297d2e42c4823a3641bc9e0807086@git.apache.org> References: <2a1297d2e42c4823a3641bc9e0807086@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [8/9] flink git commit: [FLINK-1679] deprecate old parallelism config entry [FLINK-1679] deprecate old parallelism config entry old config parameter can still be used OLD parallelization.degree.default NEW parallelism.default Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/013ed82f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/013ed82f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/013ed82f Branch: refs/heads/master Commit: 013ed82ff3eccc0946d20a955d98524a7ca0f7e4 Parents: cf84bca Author: Maximilian Michels Authored: Wed Mar 18 10:44:44 2015 +0100 Committer: Maximilian Michels Committed: Mon Mar 23 09:03:56 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/configuration/ConfigConstants.java | 9 ++++++++- .../src/main/java/org/apache/flink/optimizer/Optimizer.java | 9 +++++++-- .../streaming/api/environment/StreamContextEnvironment.java | 7 ++++++- .../streaming/api/environment/StreamPlanEnvironment.java | 7 ++++++- 4 files changed, 27 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b472d8a..09f55fd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -34,7 +34,14 @@ public final class ConfigConstants { * The config parameter defining the default parallelism for jobs. */ public static final String DEFAULT_PARALLELISM_KEY = "parallelism.default"; - + + /** + * The deprecated config parameter defining the default parallelism for jobs. + */ + @Deprecated + public static final String DEFAULT_PARALLELISM_KEY_OLD = "parallelization.degree.default"; + + /** * Config parameter for the number of re-tries for failed tasks. Setting this * value to 0 effectively disables fault tolerance. http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java index 90421b7..c80cfc2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java @@ -348,10 +348,15 @@ public class Optimizer { this.costEstimator = estimator; // determine the default parallelism + // check for old key string first, then for new one this.defaultParallelism = GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, ConfigConstants.DEFAULT_PARALLELISM); - + // now check for new one which overwrites old values + this.defaultParallelism = GlobalConfiguration.getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + this.defaultParallelism); + if (defaultParallelism < 1) { LOG.warn("Config value " + defaultParallelism + " for option " + ConfigConstants.DEFAULT_PARALLELISM + " is invalid. Ignoring and using a value of 1."); http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 7ae78f1..f7dd0bf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -38,9 +38,14 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { if (parallelism > 0) { setParallelism(parallelism); } else { + // first check for old parallelism config key setParallelism(GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, ConfigConstants.DEFAULT_PARALLELISM)); + // then for new + setParallelism(GlobalConfiguration.getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + getParallelism())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/013ed82f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index 2cf5cc2..592fa1a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -36,9 +36,14 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { if (parallelism > 0) { setParallelism(parallelism); } else { + // first check for old parallelism config key setParallelism(GlobalConfiguration.getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD, ConfigConstants.DEFAULT_PARALLELISM)); + // then for new + setParallelism(GlobalConfiguration.getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + getParallelism())); } }