spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iras...@apache.org
Subject spark git commit: [SPARK-20327][YARN] Follow up: fix resource request tests on Hadoop 3.
Date Wed, 17 Oct 2018 15:40:54 GMT
Repository: spark
Updated Branches:
  refs/heads/master 24f5bbd77 -> 7d425b190


[SPARK-20327][YARN] Follow up: fix resource request tests on Hadoop 3.

The test fix is to allocate a `Resource` object only after the resource
types have been initialized. Otherwise the YARN classes get in a weird
state and throw a different exception than expected, because the resource
has a different view of the registered resources.

I also removed a test for a null resource since that seems unnecessary
and made the fix more complicated.

All the other changes are just cleanup; basically simplify the tests by
defining what is being tested and deriving the resource type registration
and the SparkConf from that data, instead of having redundant definitions
in the tests.

Ran tests with Hadoop 3 (and also without it).

Closes #22751 from vanzin/SPARK-20327.fix.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d425b19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d425b19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d425b19

Branch: refs/heads/master
Commit: 7d425b190a91f49193eeb58c398e497ff92c6169
Parents: 24f5bbd
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Wed Oct 17 10:40:47 2018 -0500
Committer: Imran Rashid <irashid@cloudera.com>
Committed: Wed Oct 17 10:40:47 2018 -0500

----------------------------------------------------------------------
 .../deploy/yarn/ResourceRequestHelper.scala     |   4 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  70 +++----
 .../yarn/ResourceRequestHelperSuite.scala       | 181 ++++++-------------
 .../deploy/yarn/ResourceRequestTestHelper.scala |  13 +-
 4 files changed, 90 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d425b19/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 9534f3a..012268e 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CausedBy, Utils}
 
 /**
  * This helper class uses some of Hadoop 3 methods from the YARN API,
@@ -121,6 +121,8 @@ private object ResourceRequestHelper extends Logging {
         case _: MatchError =>
           throw new IllegalArgumentException(s"Resource request for '$name' ('$rawAmount')
" +
               s"does not match pattern $AMOUNT_AND_UNIT_REGEX.")
+        case CausedBy(e: IllegalArgumentException) =>
+          throw new IllegalArgumentException(s"Invalid request for $name: ${e.getMessage}")
         case e: InvocationTargetException if e.getCause != null => throw e.getCause
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d425b19/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 533cb2b..b3286e8 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -200,20 +200,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
     appContext.getMaxAppAttempts should be (42)
   }
 
-  test("resource request (client mode)") {
-    val sparkConf = new SparkConf().set("spark.submit.deployMode", "client")
-      .set(YARN_AM_RESOURCE_TYPES_PREFIX + "fpga", "2")
-      .set(YARN_AM_RESOURCE_TYPES_PREFIX + "gpu", "3")
-    testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 2), ("gpu", 3)))
-  }
-
-  test("resource request (cluster mode)") {
-    val sparkConf = new SparkConf().set("spark.submit.deployMode", "cluster")
-      .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "fpga", "4")
-      .set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + "gpu", "5")
-    testResourceRequest(sparkConf, List("gpu", "fpga"), Seq(("fpga", 4), ("gpu", 5)))
-  }
-
   test("spark.yarn.jars with multiple paths and globs") {
     val libs = Utils.createTempDir()
     val single = Utils.createTempDir()
@@ -372,6 +358,35 @@ class ClientSuite extends SparkFunSuite with Matchers {
     sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName)))
   }
 
+  Seq(
+    "client" -> YARN_AM_RESOURCE_TYPES_PREFIX,
+    "cluster" -> YARN_DRIVER_RESOURCE_TYPES_PREFIX
+  ).foreach { case (deployMode, prefix) =>
+    test(s"custom resource request ($deployMode mode)") {
+      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      val resources = Map("fpga" -> 2, "gpu" -> 3)
+      ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
+
+      val conf = new SparkConf().set("spark.submit.deployMode", deployMode)
+      resources.foreach { case (name, v) =>
+        conf.set(prefix + name, v.toString)
+      }
+
+      val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+      val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
+      val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
+
+      val client = new Client(new ClientArguments(Array()), conf)
+      client.createApplicationSubmissionContext(
+        new YarnClientApplication(getNewApplicationResponse, appContext),
+        containerLaunchContext)
+
+      resources.foreach { case (name, value) =>
+        ResourceRequestTestHelper.getRequestedValue(appContext.getResource, name) should
be (value)
+      }
+    }
+  }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),
@@ -447,31 +462,4 @@ class ClientSuite extends SparkFunSuite with Matchers {
     populateClasspath(null, new Configuration(), client.sparkConf, env)
     classpath(env)
   }
-
-  private def testResourceRequest(
-      sparkConf: SparkConf,
-      resources: List[String],
-      expectedResources: Seq[(String, Long)]): Unit = {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-    ResourceRequestTestHelper.initializeResourceTypes(resources)
-
-    val args = new ClientArguments(Array())
-
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
-    val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
-
-    val client = new Client(args, sparkConf)
-    client.createApplicationSubmissionContext(
-      new YarnClientApplication(getNewApplicationResponse, appContext),
-      containerLaunchContext)
-
-    appContext.getAMContainerSpec should be (containerLaunchContext)
-    appContext.getApplicationType should be ("SPARK")
-
-    expectedResources.foreach { case (name, value) =>
-      ResourceRequestTestHelper.getResourceTypeValue(appContext.getResource, name) should
be (value)
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d425b19/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 6005998..8032213 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -39,42 +39,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
   private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY
   private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES
 
-  test("resource request value does not match pattern") {
-    verifySetResourceRequestsException(List(CUSTOM_RES_1),
-      Map(CUSTOM_RES_1 -> "**@#"), CUSTOM_RES_1)
-  }
-
-  test("resource request just unit defined") {
-    verifySetResourceRequestsException(List(), Map(CUSTOM_RES_1 -> "m"), CUSTOM_RES_1)
-  }
-
-  test("resource request with null value should not be allowed") {
-    verifySetResourceRequestsException(List(), null, Map(CUSTOM_RES_1 -> "123"),
-      "requirement failed: Resource parameter should not be null!")
-  }
-
-  test("resource request with valid value and invalid unit") {
-    verifySetResourceRequestsException(List(CUSTOM_RES_1), createResource,
-      Map(CUSTOM_RES_1 -> "123ppp"), "")
-  }
-
-  test("resource request with valid value and without unit") {
-    verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "123"),
-      Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "")))
-  }
-
-  test("resource request with valid value and unit") {
-    verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1), Map(CUSTOM_RES_1 -> "2g"),
-      Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 2, "G")))
-  }
-
-  test("two resource requests with valid values and units") {
-    verifySetResourceRequestsSuccessful(List(CUSTOM_RES_1, CUSTOM_RES_2),
-      Map(CUSTOM_RES_1 -> "123m", CUSTOM_RES_2 -> "10G"),
-      Map(CUSTOM_RES_1 -> ResourceInformation(CUSTOM_RES_1, 123, "m"),
-        CUSTOM_RES_2 -> ResourceInformation(CUSTOM_RES_2, 10, "G")))
-  }
-
   test("empty SparkConf should be valid") {
     val sparkConf = new SparkConf()
     ResourceRequestHelper.validateResources(sparkConf)
@@ -89,61 +53,68 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
     ResourceRequestHelper.validateResources(sparkConf)
   }
 
-  test("memory defined with new config for executor") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_EXECUTOR_MEMORY, "30G")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_MEMORY)
-  }
+  Seq(
+    "value with unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 2, "G")),
+    "value without unit" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "")),
+    "multiple resources" -> Seq(ResourceInformation(CUSTOM_RES_1, 123, "m"),
+      ResourceInformation(CUSTOM_RES_2, 10, "G"))
+  ).foreach { case (name, resources) =>
+    test(s"valid request: $name") {
+      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      val resourceDefs = resources.map { r => r.name }
+      val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap
 
-  test("memory defined with new config for executor 2") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb", "30G")
-    verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb")
-  }
+      ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
 
-  test("memory defined with new config for executor 3") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb", "30G")
-    verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb")
-  }
-
-  test("cores defined with new config for executor") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_EXECUTOR_CORES, "5")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_EXECUTOR_CORES)
-  }
-
-  test("cores defined with new config for executor 2") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores", "5")
-    verifyValidateResourcesException(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores")
-  }
+      val resource = createResource()
+      ResourceRequestHelper.setResourceRequests(requests, resource)
 
-  test("memory defined with new config, client mode") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_AM_MEMORY, "1G")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_MEMORY)
-  }
-
-  test("memory defined with new config for driver, cluster mode") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "1G")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_MEMORY)
+      resources.foreach { r =>
+        val requested = ResourceRequestTestHelper.getResourceInformationByName(resource,
r.name)
+        assert(requested === r)
+      }
+    }
   }
 
-  test("cores defined with new config, client mode") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_AM_CORES, "3")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_AM_CORES)
+  Seq(
+    ("value does not match pattern", CUSTOM_RES_1, "**@#"),
+    ("only unit defined", CUSTOM_RES_1, "m"),
+    ("invalid unit", CUSTOM_RES_1, "123ppp")
+  ).foreach { case (name, key, value) =>
+    test(s"invalid request: $name") {
+      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
+
+      val resource = createResource()
+      val thrown = intercept[IllegalArgumentException] {
+        ResourceRequestHelper.setResourceRequests(Map(key -> value), resource)
+      }
+      thrown.getMessage should include (key)
+    }
   }
 
-  test("cores defined with new config for driver, cluster mode") {
-    val sparkConf = new SparkConf()
-    sparkConf.set(NEW_CONFIG_DRIVER_CORES, "1G")
-    verifyValidateResourcesException(sparkConf, NEW_CONFIG_DRIVER_CORES)
+  Seq(
+    NEW_CONFIG_EXECUTOR_MEMORY -> "30G",
+    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb" -> "30G",
+    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb" -> "30G",
+    NEW_CONFIG_EXECUTOR_CORES -> "5",
+    YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores" -> "5",
+    NEW_CONFIG_AM_MEMORY -> "1G",
+    NEW_CONFIG_DRIVER_MEMORY -> "1G",
+    NEW_CONFIG_AM_CORES -> "3",
+    NEW_CONFIG_DRIVER_CORES -> "1G"
+  ).foreach { case (key, value) =>
+    test(s"disallowed resource request: $key") {
+      assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+      val conf = new SparkConf(false).set(key, value)
+      val thrown = intercept[SparkException] {
+        ResourceRequestHelper.validateResources(conf)
+      }
+      thrown.getMessage should include (key)
+    }
   }
 
-  test("various duplicated definitions") {
+  test("multiple disallowed resources in config") {
     val sparkConf = new SparkConf()
     sparkConf.set(DRIVER_MEMORY.key, "2G")
     sparkConf.set(DRIVER_CORES.key, "2")
@@ -163,52 +134,6 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers
{
       include(NEW_CONFIG_DRIVER_MEMORY))
   }
 
-  private def verifySetResourceRequestsSuccessful(
-      definedResourceTypes: List[String],
-      resourceRequests: Map[String, String],
-      expectedResources: Map[String, ResourceInformation]): Unit = {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-    ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
-
-    val resource = createResource()
-    ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
-
-    expectedResources.foreach { case (name, ri) =>
-      val resourceInfo = ResourceRequestTestHelper.getResourceInformationByName(resource,
name)
-      assert(resourceInfo === ri)
-    }
-  }
-
-  private def verifySetResourceRequestsException(
-      definedResourceTypes: List[String],
-      resourceRequests: Map[String, String],
-      message: String): Unit = {
-    val resource = createResource()
-    verifySetResourceRequestsException(definedResourceTypes, resource, resourceRequests,
message)
-  }
-
-  private def verifySetResourceRequestsException(
-      definedResourceTypes: List[String],
-      resource: Resource,
-      resourceRequests: Map[String, String],
-      message: String) = {
-    assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
-    ResourceRequestTestHelper.initializeResourceTypes(definedResourceTypes)
-    val thrown = intercept[IllegalArgumentException] {
-      ResourceRequestHelper.setResourceRequests(resourceRequests, resource)
-    }
-    if (!message.isEmpty) {
-      thrown.getMessage should include (message)
-    }
-  }
-
-  private def verifyValidateResourcesException(sparkConf: SparkConf, message: String) = {
-    val thrown = intercept[SparkException] {
-      ResourceRequestHelper.validateResources(sparkConf)
-    }
-    thrown.getMessage should include (message)
-  }
-
   private def createResource(): Resource = {
     val resource = Records.newRecord(classOf[Resource])
     resource.setMemory(512)

http://git-wip-us.apache.org/repos/asf/spark/blob/7d425b19/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
index c46f3c5..953d447 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
@@ -18,20 +18,18 @@
 package org.apache.spark.deploy.yarn
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.yarn.api.records.Resource
 
 import org.apache.spark.util.Utils
 
 object ResourceRequestTestHelper {
-  def initializeResourceTypes(resourceTypes: List[String]): Unit = {
+  def initializeResourceTypes(resourceTypes: Seq[String]): Unit = {
     if (!ResourceRequestHelper.isYarnResourceTypesAvailable()) {
       throw new IllegalStateException("This method should not be invoked " +
         "since YARN resource types is not available because of old Hadoop version!" )
     }
 
-    val allResourceTypes = new ListBuffer[AnyRef]
     // ResourceUtils.reinitializeResources() is the YARN-way
     // to specify resources for the execution of the tests.
     // This method should receive standard resources with names of memory-mb and vcores.
@@ -42,8 +40,7 @@ object ResourceRequestTestHelper {
       createResourceTypeInfo("memory-mb"),
       createResourceTypeInfo("vcores"))
     val customResourceTypes = resourceTypes.map(createResourceTypeInfo)
-    allResourceTypes ++= defaultResourceTypes
-    allResourceTypes ++= customResourceTypes
+    val allResourceTypes = defaultResourceTypes ++ customResourceTypes
 
     val resourceUtilsClass =
       Utils.classForName("org.apache.hadoop.yarn.util.resource.ResourceUtils")
@@ -58,8 +55,8 @@ object ResourceRequestTestHelper {
     resTypeInfoNewInstanceMethod.invoke(null, resourceName)
   }
 
-  def getResourceTypeValue(res: Resource, name: String): AnyRef = {
-    val resourceInformation = getResourceInformation(res, name)
+  def getRequestedValue(res: Resource, rtype: String): AnyRef = {
+    val resourceInformation = getResourceInformation(res, rtype)
     invokeMethod(resourceInformation, "getValue")
   }
 
@@ -88,5 +85,5 @@ object ResourceRequestTestHelper {
     getValueMethod.invoke(resourceInformation)
   }
 
-  case class ResourceInformation(name: String, value: Long, units: String)
+  case class ResourceInformation(name: String, value: Long, unit: String)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message