spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-11073][CORE][YARN] Remove akka dependency in secret key generation.
Date Sun, 01 Nov 2015 23:57:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master cf04fdfe7 -> f8d93edec


[SPARK-11073][CORE][YARN] Remove akka dependency in secret key generation.

Use standard JDK APIs for that (with a little help from Guava). Most of the
changes here are in test code, since there were no tests specific to that
part of the code.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9257 from vanzin/SPARK-11073.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8d93ede
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8d93ede
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8d93ede

Branch: refs/heads/master
Commit: f8d93edec82eedab59d50aec06ca2de7e4cf14f6
Parents: cf04fdf
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Sun Nov 1 15:57:42 2015 -0800
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Sun Nov 1 15:57:42 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/SecurityManager.scala      | 72 +++++++++++---------
 .../org/apache/spark/SecurityManagerSuite.scala | 23 ++++++-
 .../spark/deploy/LogUrlsStandaloneSuite.scala   | 13 +---
 .../deploy/worker/WorkerArgumentsTest.scala     | 28 +-------
 .../apache/spark/storage/LocalDirsSuite.scala   | 16 +----
 .../apache/spark/util/SparkConfWithEnv.scala    | 34 +++++++++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  3 +-
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  | 32 ++++++++-
 8 files changed, 138 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 746d208..64e483e 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark
 
+import java.lang.{Byte => JByte}
 import java.net.{Authenticator, PasswordAuthentication}
-import java.security.KeyStore
+import java.security.{KeyStore, SecureRandom}
 import java.security.cert.X509Certificate
 import javax.net.ssl._
 
+import com.google.common.hash.HashCodes
 import com.google.common.io.Files
 import org.apache.hadoop.io.Text
 
@@ -130,15 +132,16 @@ import org.apache.spark.util.Utils
  *
  *  The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
  *
- *  For Yarn deployments, the secret is automatically generated using the Akka remote
- *  Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
- *  around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different
levels
- *  of protection. See the Hadoop documentation for more details. Each Spark application
on Yarn
- *  gets a different shared secret. On Yarn, the Spark UI gets configured to use the Hadoop
Yarn
- *  AmIpFilter which requires the user to go through the ResourceManager Proxy. That Proxy
is there
- *  to reduce the possibility of web based attacks through YARN. Hadoop can be configured
to use
- *  filters to do authentication. That authentication then happens via the ResourceManager
Proxy
- *  and Spark will use that to do authorization against the view acls.
+ *  For YARN deployments, the secret is automatically generated. The secret is placed in
the Hadoop
+ *  UGI which gets passed around via the Hadoop RPC mechanism. Hadoop RPC can be configured
to
+ *  support different levels of protection. See the Hadoop documentation for more details.
Each
+ *  Spark application on YARN gets a different shared secret.
+ *
+ *  On YARN, the Spark UI gets configured to use the Hadoop YARN AmIpFilter which requires
the user
+ *  to go through the ResourceManager Proxy. That proxy is there to reduce the possibility
of web
+ *  based attacks through YARN. Hadoop can be configured to use filters to do authentication.
That
+ *  authentication then happens via the ResourceManager Proxy and Spark will use that to
do
+ *  authorization against the view acls.
  *
  *  For other Spark deployments, the shared secret must be specified via the
  *  spark.authenticate.secret config.
@@ -189,8 +192,7 @@ import org.apache.spark.util.Utils
 private[spark] class SecurityManager(sparkConf: SparkConf)
   extends Logging with SecretKeyHolder {
 
-  // key used to store the spark secret in the Hadoop UGI
-  private val sparkSecretLookupKey = "sparkCookie"
+  import SecurityManager._
 
   private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
   // keep spark.ui.acls.enable for backwards compatibility with 1.0
@@ -365,33 +367,38 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
    * we throw an exception.
    */
   private def generateSecretKey(): String = {
-    if (!isAuthenticationEnabled) return null
-    // first check to see if the secret is already set, else generate a new one if on yarn
-    val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
-      val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
-      if (secretKey != null) {
-        logDebug("in yarn mode, getting secret from credentials")
-        return new Text(secretKey).toString
+    if (!isAuthenticationEnabled) {
+      null
+    } else if (SparkHadoopUtil.get.isYarnMode) {
+      // In YARN mode, the secure cookie will be created by the driver and stashed in the
+      // user's credentials, where executors can get it. The check for an array of size 0
+      // is because of the test code in YarnSparkHadoopUtilSuite.
+      val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
+      if (secretKey == null || secretKey.length == 0) {
+        logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
+        val rnd = new SecureRandom()
+        val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
+        val secret = new Array[Byte](length)
+        rnd.nextBytes(secret)
+
+        val cookie = HashCodes.fromBytes(secret).toString()
+        SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
+        cookie
       } else {
-        logDebug("getSecretKey: yarn mode, secret key from credentials is null")
+        new Text(secretKey).toString
       }
-      val cookie = akka.util.Crypt.generateSecureCookie
-      // if we generated the secret then we must be the first so lets set it so t
-      // gets used by everyone else
-      SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)
-      logInfo("adding secret to credentials in yarn mode")
-      cookie
     } else {
       // user must have set spark.authenticate.secret config
       // For Master/Worker, auth secret is in conf; for Executors, it is in env variable
-      sys.env.get(SecurityManager.ENV_AUTH_SECRET)
+      Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
         .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
         case Some(value) => value
-        case None => throw new Exception("Error: a secret key must be specified via the
" +
-          SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
+        case None =>
+          throw new IllegalArgumentException(
+            "Error: a secret key must be specified via the " +
+              SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
       }
     }
-    sCookie
   }
 
   /**
@@ -475,6 +482,9 @@ private[spark] object SecurityManager {
   val SPARK_AUTH_CONF: String = "spark.authenticate"
   val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
   // This is used to set auth secret to an executor's env variable. It should have the same
-  // value as SPARK_AUTH_SECERET_CONF set in SparkConf
+  // value as SPARK_AUTH_SECRET_CONF set in SparkConf
   val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
+
+  // key used to store the spark secret in the Hadoop UGI
+  val SECRET_LOOKUP_KEY = "sparkCookie"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index f29160d..26b95c0 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import java.io.File
 
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
 class SecurityManagerSuite extends SparkFunSuite {
 
@@ -223,5 +223,26 @@ class SecurityManagerSuite extends SparkFunSuite {
     assert(securityManager.hostnameVerifier.isDefined === false)
   }
 
+  test("missing secret authentication key") {
+    val conf = new SparkConf().set("spark.authenticate", "true")
+    intercept[IllegalArgumentException] {
+      new SecurityManager(conf)
+    }
+  }
+
+  test("secret authentication key") {
+    val key = "very secret key"
+    val conf = new SparkConf()
+      .set(SecurityManager.SPARK_AUTH_CONF, "true")
+      .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key)
+    assert(key === new SecurityManager(conf).getSecretKey())
+
+    val keyFromEnv = "very secret key from env"
+    val conf2 = new SparkConfWithEnv(Map(SecurityManager.ENV_AUTH_SECRET -> keyFromEnv))
+      .set(SecurityManager.SPARK_AUTH_CONF, "true")
+      .set(SecurityManager.SPARK_AUTH_SECRET_CONF, key)
+    assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 86eb41d..8dd31b4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -25,6 +25,7 @@ import scala.io.Source
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.util.SparkConfWithEnv
 
 class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -53,17 +54,7 @@ class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext
{
 
   test("verify that log urls reflect SPARK_PUBLIC_DNS (SPARK-6175)") {
     val SPARK_PUBLIC_DNS = "public_dns"
-    class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String): String = {
-        if (name == "SPARK_PUBLIC_DNS") SPARK_PUBLIC_DNS
-        else super.getenv(name)
-      }
-
-      override def clone: SparkConf = {
-        new MySparkConf().setAll(getAll)
-      }
-    }
-    val conf = new MySparkConf().set(
+    val conf = new SparkConfWithEnv(Map("SPARK_PUBLIC_DNS" -> SPARK_PUBLIC_DNS)).set(
       "spark.extraListeners", classOf[SaveExecutorInfo].getName)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
index 15f7ca4..637e78f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.deploy.worker
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-
+import org.apache.spark.util.SparkConfWithEnv
 
 class WorkerArgumentsTest extends SparkFunSuite {
 
@@ -34,18 +34,7 @@ class WorkerArgumentsTest extends SparkFunSuite {
 
   test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves off M or G")
{
     val args = Array("spark://localhost:0000  ")
-
-    class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String): String = {
-        if (name == "SPARK_WORKER_MEMORY") "50000"
-        else super.getenv(name)
-      }
-
-      override def clone: SparkConf = {
-        new MySparkConf().setAll(getAll)
-      }
-    }
-    val conf = new MySparkConf()
+    val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "50000"))
     intercept[IllegalStateException] {
       new WorkerArguments(args, conf)
     }
@@ -53,18 +42,7 @@ class WorkerArgumentsTest extends SparkFunSuite {
 
   test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") {
     val args = Array("spark://localhost:0000  ")
-
-    class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String): String = {
-        if (name == "SPARK_WORKER_MEMORY") "5G"
-        else super.getenv(name)
-      }
-
-      override def clone: SparkConf = {
-        new MySparkConf().setAll(getAll)
-      }
-    }
-    val conf = new MySparkConf()
+    val conf = new SparkConfWithEnv(Map("SPARK_WORKER_MEMORY" -> "5G"))
     val workerArgs = new WorkerArguments(args, conf)
     assert(workerArgs.memory === 5120)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
index ac6fec5..cc50289 100644
--- a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.util.Utils
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-
+import org.apache.spark.util.SparkConfWithEnv
 
 /**
  * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
@@ -45,20 +45,10 @@ class LocalDirsSuite extends SparkFunSuite with BeforeAndAfter {
   test("SPARK_LOCAL_DIRS override also affects driver") {
     // Regression test for SPARK-2975
     assert(!new File("/NONEXISTENT_DIR").exists())
-    // SPARK_LOCAL_DIRS is a valid directory:
-    class MySparkConf extends SparkConf(false) {
-      override def getenv(name: String): String = {
-        if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
-        else super.getenv(name)
-      }
-
-      override def clone: SparkConf = {
-        new MySparkConf().setAll(getAll)
-      }
-    }
     // spark.local.dir only contains invalid directories, but that's not a problem since
     // SPARK_LOCAL_DIRS will override it on both the driver and workers:
-    val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
+    val conf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> System.getProperty("java.io.tmpdir")))
+      .set("spark.local.dir", "/NONEXISTENT_PATH")
     assert(new File(Utils.getLocalDir(conf)).exists())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
new file mode 100644
index 0000000..ddd5edf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/SparkConfWithEnv.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkConf
+
+/**
+ * Customized SparkConf that allows env variables to be overridden.
+ */
+class SparkConfWithEnv(env: Map[String, String]) extends SparkConf(false) {
+  override def getenv(name: String): String = {
+    env.get(name).getOrElse(super.getenv(name))
+  }
+
+  override def clone: SparkConf = {
+    new SparkConfWithEnv(env).setAll(getAll)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5924daf..561ad79 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.regex.Matcher
 import java.util.regex.Pattern
 
@@ -81,7 +82,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
   override def addSecretKeyToUserCredentials(key: String, secret: String) {
     val creds = new Credentials()
-    creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
+    creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
     addCurrentUserCredentials(creds)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8d93ede/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 9132c56..a70e66d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -24,8 +24,10 @@ import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.Matchers
 
@@ -263,7 +265,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with
Logging
     })
   }
 
-  def assertNestedHiveException(e: InvocationTargetException): Throwable = {
+  private def assertNestedHiveException(e: InvocationTargetException): Throwable = {
     val inner = e.getCause
     if (inner == null) {
       fail("No inner cause", e)
@@ -274,4 +276,32 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with
Logging
     inner
   }
 
+  // This test needs to live here because it depends on isYarnMode returning true, which
can only
+  // happen in the YARN module.
+  test("security manager token generation") {
+    try {
+      System.setProperty("SPARK_YARN_MODE", "true")
+      val initial = SparkHadoopUtil.get
+        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+      assert(initial === null || initial.length === 0)
+
+      val conf = new SparkConf()
+        .set(SecurityManager.SPARK_AUTH_CONF, "true")
+        .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
+      val sm = new SecurityManager(conf)
+
+      val generated = SparkHadoopUtil.get
+        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+      assert(generated != null)
+      val genString = new Text(generated).toString()
+      assert(genString != "unused")
+      assert(sm.getSecretKey() === genString)
+    } finally {
+      // removeSecretKey() was only added in Hadoop 2.6, so instead we just set the secret
+      // to an empty string.
+      SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY,
"")
+      System.clearProperty("SPARK_YARN_MODE")
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message