Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 30117200D44 for ; Mon, 20 Nov 2017 12:45:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2E4E1160BF9; Mon, 20 Nov 2017 11:45:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 781FE160BEC for ; Mon, 20 Nov 2017 12:45:26 +0100 (CET) Received: (qmail 37866 invoked by uid 500); 20 Nov 2017 11:45:25 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 37856 invoked by uid 99); 20 Nov 2017 11:45:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Nov 2017 11:45:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8DF70E0823; Mon, 20 Nov 2017 11:45:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-22533][CORE] Handle deprecated names in ConfigEntry. Date: Mon, 20 Nov 2017 11:45:25 +0000 (UTC) archived-at: Mon, 20 Nov 2017 11:45:27 -0000 Repository: spark Updated Branches: refs/heads/master 3c3eebc87 -> c13b60e01 [SPARK-22533][CORE] Handle deprecated names in ConfigEntry. This change hooks up the config reader to `SparkConf.getDeprecatedConfig`, so that config constants with deprecated names generate the proper warnings. It also changes two deprecated configs from the new "alternatives" system to the old deprecation system, since they're not yet hooked up to each other. Added a few unit tests to verify the desired behavior. Author: Marcelo Vanzin Closes #19760 from vanzin/SPARK-22533. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c13b60e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c13b60e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c13b60e0 Branch: refs/heads/master Commit: c13b60e0194c90156e74d10b19f94c70675d21ae Parents: 3c3eebc Author: Marcelo Vanzin Authored: Mon Nov 20 12:45:21 2017 +0100 Committer: Wenchen Fan Committed: Mon Nov 20 12:45:21 2017 +0100 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/SparkConf.scala | 16 ++++++++++------ .../spark/internal/config/ConfigProvider.scala | 4 +++- .../org/apache/spark/internal/config/package.scala | 2 -- .../scala/org/apache/spark/SparkConfSuite.scala | 7 +++++++ 4 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ee726df..0e08ff6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -17,6 +17,7 @@ package org.apache.spark +import java.util.{Map => JMap} import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ @@ -24,6 +25,7 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.serializer.KryoSerializer @@ -370,7 +372,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria /** Get a parameter as an Option */ def getOption(key: String): Option[String] = { - Option(settings.get(key)).orElse(getDeprecatedConfig(key, this)) + Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings)) } /** Get an optional value, applying variable substitution. */ @@ -622,7 +624,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.history.updateInterval", "1.3")), "spark.history.fs.cleaner.interval" -> Seq( AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")), - "spark.history.fs.cleaner.maxAge" -> Seq( + MAX_LOG_AGE_S.key -> Seq( AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")), "spark.yarn.am.waitTime" -> Seq( AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", @@ -663,8 +665,10 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")), - "spark.maxRemoteBlockSizeFetchToMem" -> Seq( - AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) + MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), + LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( + AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")) ) /** @@ -704,9 +708,9 @@ private[spark] object SparkConf extends Logging { * Looks for available deprecated keys for the given config option, and return the first * value available. */ - def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = { + def getDeprecatedConfig(key: String, conf: JMap[String, String]): Option[String] = { configsWithAlternatives.get(key).flatMap { alts => - alts.collectFirst { case alt if conf.contains(alt.key) => + alts.collectFirst { case alt if conf.containsKey(alt.key) => val value = conf.get(alt.key) if (alt.translation != null) alt.translation(value) else value } http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala index 5d98a11..392f9d5 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala @@ -19,6 +19,8 @@ package org.apache.spark.internal.config import java.util.{Map => JMap} +import org.apache.spark.SparkConf + /** * A source of configuration values. */ @@ -53,7 +55,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con override def get(key: String): Option[String] = { if (key.startsWith("spark.")) { - Option(conf.get(key)) + Option(conf.get(key)).orElse(SparkConf.getDeprecatedConfig(key, conf)) } else { None } http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7be4d6b..7a90727 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -209,7 +209,6 @@ package object config { private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") - .withAlternative("spark.scheduler.listenerbus.eventqueue.size") .intConf .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .createWithDefault(10000) @@ -404,7 +403,6 @@ package object config { "affect both shuffle fetch and block manager remote block fetch. For users who " + "enabled external shuffle service, this feature can only be worked when external shuffle" + " service is newer than Spark 2.2.") - .withAlternative("spark.reducer.maxReqSizeShuffleToMem") .bytesConf(ByteUnit.BYTE) .createWithDefault(Long.MaxValue) http://git-wip-us.apache.org/repos/asf/spark/blob/c13b60e0/core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 0897891..c771eb4 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -26,6 +26,7 @@ import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo +import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} @@ -248,6 +249,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.kryoserializer.buffer.mb", "1.1") assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) + + conf.set("spark.history.fs.cleaner.maxAge.seconds", "42") + assert(conf.get(MAX_LOG_AGE_S) === 42L) + + conf.set("spark.scheduler.listenerbus.eventqueue.size", "84") + assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84) } test("akka deprecated configs") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org