kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danburk...@apache.org
Subject kudu git commit: KUDU-1999: Spark connector should login with Kerberos credentials on driver
Date Wed, 10 May 2017 00:54:02 GMT
Repository: kudu
Updated Branches:
  refs/heads/branch-1.3.x 1aa34a1d2 -> ea87fdd8e


KUDU-1999: Spark connector should login with Kerberos credentials on driver

Tested on a secure cluster using the Spark ITBLL job:

spark2-submit \
    --deploy-mode=cluster \
    --master=yarn \
    --principal=dan \
    --keytab dan.keytab \
    --class org.apache.kudu.spark.tools.IntegrationTestBigLinkedList \
    kudu-spark-tools-1.4.0-SNAPSHOT.jar generate \
    --master-addrs=kudu-spark-secure-1.gce.cloudera.com

Without some very major changes to our test infrastructure it's not
possible to test this code in unit tests, since it relies on a secure
Yarn cluster being available.

note: long-running jobs will continue to fail, since credentials are
still not refreshed.

Change-Id: If87a470c1cf99ea52668f22b72f1f7331877ec63
Reviewed-on: http://gerrit.cloudera.org:8080/6822
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/6836


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ea87fdd8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ea87fdd8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ea87fdd8

Branch: refs/heads/branch-1.3.x
Commit: ea87fdd8e6015c7b292523178a45640af0aa79d1
Parents: 1aa34a1
Author: Dan Burkert <danburkert@apache.org>
Authored: Mon May 8 17:53:36 2017 -0700
Committer: Dan Burkert <danburkert@apache.org>
Committed: Wed May 10 00:52:27 2017 +0000

----------------------------------------------------------------------
 docs/developing.adoc                            |  2 +-
 .../apache/kudu/spark/kudu/DefaultSource.scala  |  4 +-
 .../apache/kudu/spark/kudu/KuduContext.scala    | 86 +++++++++++++++++---
 .../apache/kudu/spark/kudu/TestContext.scala    |  2 +-
 4 files changed, 79 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ea87fdd8/docs/developing.adoc
----------------------------------------------------------------------
diff --git a/docs/developing.adoc b/docs/developing.adoc
index 539fc4d..f5938a4 100644
--- a/docs/developing.adoc
+++ b/docs/developing.adoc
@@ -122,7 +122,7 @@ df.registerTempTable("kudu_table")
 val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()
 
 // Use KuduContext to create, delete, or write to Kudu tables
-val kuduContext = new KuduContext("kudu.master:7051")
+val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext)
 
 // Create a new Kudu table from a dataframe schema
 // NB: No rows from the dataframe are inserted into the table

http://git-wip-us.apache.org/repos/asf/kudu/blob/ea87fdd8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index f7875b5..48493b2 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -124,14 +124,14 @@ class KuduRelation(private val tableName: String,
                    private val masterAddrs: String,
                    private val operationType: OperationType,
                    private val userSchema: Option[StructType])(
-                    val sqlContext: SQLContext)
+                   val sqlContext: SQLContext)
   extends BaseRelation
     with PrunedFilteredScan
     with InsertableRelation {
 
   import KuduRelation._
 
-  private val context: KuduContext = new KuduContext(masterAddrs)
+  private val context: KuduContext = new KuduContext(masterAddrs, sqlContext.sparkContext)
   private val table: KuduTable = context.syncClient.openTable(tableName)
 
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] =

http://git-wip-us.apache.org/repos/asf/kudu/blob/ea87fdd8/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 8c80ea0..8ca1e03 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -17,30 +17,41 @@
 
 package org.apache.kudu.spark.kudu
 
+import java.security.{AccessController, PrivilegedAction}
 import java.util
+import javax.security.auth.Subject
+import javax.security.auth.login.{AppConfigurationEntry, Configuration, LoginContext}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
-
 import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
-import org.apache.spark.sql.{DataFrame, Row}
-
 import org.apache.kudu.annotations.InterfaceStability
 import org.apache.kudu.client.SessionConfiguration.FlushMode
 import org.apache.kudu.client._
+import org.apache.kudu.spark.kudu
 import org.apache.kudu.{ColumnSchema, Schema, Type}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DataType, DataTypes, StructType}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.slf4j.{Logger, LoggerFactory}
 
 /**
   * KuduContext is a serializable container for Kudu client connections.
   *
   * If a Kudu client connection is needed as part of a Spark application, a
-  * [[KuduContext]] should used as a broadcast variable in the job in order to
-  * share connections among the tasks in a JVM.
+  * [[KuduContext]] should be created in the driver, and shared with executors
+  * as a serializable field.
   */
 @InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
+class KuduContext(val kuduMaster: String,
+                  sc: SparkContext) extends Serializable {
+  import kudu.KuduContext._
+
+  @Deprecated()
+  def this(kuduMaster: String) {
+    this(kuduMaster, new SparkContext())
+  }
 
   @transient lazy val syncClient = {
     val c = KuduConnection.getSyncClient(kuduMaster)
@@ -59,8 +70,11 @@ class KuduContext(kuduMaster: String) extends Serializable {
   }
 
   // Visible for testing.
-  private[kudu] val authnCredentials : Array[Byte] =
-    syncClient.exportAuthenticationCredentials()
+  private[kudu] val authnCredentials : Array[Byte] = {
+    Subject.doAs(getSubject(sc), new PrivilegedAction[Array[Byte]] {
+      override def run(): Array[Byte] = syncClient.exportAuthenticationCredentials()
+    })
+  }
 
   /**
     * Create an RDD from a Kudu table.
@@ -240,6 +254,56 @@ class KuduContext(kuduMaster: String) extends Serializable {
   }
 }
 
+private object KuduContext {
+  val Log: Logger = LoggerFactory.getLogger(classOf[KuduContext])
+
+  /**
+    * Returns a new Kerberos-authenticated [[Subject]] if the Spark context contains
+    * principal and keytab options, otherwise returns the currently active subject.
+    *
+    * The keytab and principal options should be set when deploying a Spark
+    * application in cluster mode with Yarn against a secure Kudu cluster. Spark
+    * internally will grab HDFS and HBase delegation tokens (see
+    * [[org.apache.spark.deploy.SparkSubmit]]), so we do something similar.
+    *
+    * This method can only be called on the driver, where the SparkContext is
+    * available.
+    *
+    * @return A Kerberos-authenticated subject if the Spark context contains
+    *         principal and keytab options, otherwise returns the currently
+    *         active subject
+    */
+  private def getSubject(sc: SparkContext): Subject = {
+    val subject = Subject.getSubject(AccessController.getContext)
+
+    val principal = sc.getConf.getOption("spark.yarn.principal").getOrElse(return subject)
+    val keytab = sc.getConf.getOption("spark.yarn.keytab").getOrElse(return subject)
+
+    Log.info(s"Logging in as principal $principal with keytab $keytab")
+
+    val conf = new Configuration {
+      override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] =
{
+        val options = Map(
+          "principal" -> principal,
+          "keyTab" -> keytab,
+          "useKeyTab" -> "true",
+          "useTicketCache" -> "false",
+          "doNotPrompt" -> "true",
+          "refreshKrb5Config" -> "true"
+        )
+
+        Array(new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
+                                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                        options.asJava))
+      }
+    }
+
+    val loginContext = new LoginContext("kudu-spark", new Subject(), null, conf)
+    loginContext.login()
+    loginContext.getSubject
+  }
+}
+
 private object KuduConnection {
   private[kudu] val syncCache = new mutable.HashMap[String, KuduClient]()
   private[kudu] val asyncCache = new mutable.HashMap[String, AsyncKuduClient]()

http://git-wip-us.apache.org/repos/asf/kudu/blob/ea87fdd8/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index f46d290..2748f9a 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -77,7 +77,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     kuduClient = new KuduClientBuilder(miniCluster.getMasterAddresses).build()
     assert(miniCluster.waitForTabletServers(1))
 
-    kuduContext = new KuduContext(miniCluster.getMasterAddresses)
+    kuduContext = new KuduContext(miniCluster.getMasterAddresses, sc)
 
     val tableOptions = new CreateTableOptions().setRangePartitionColumns(List("key").asJava)
                                                .setNumReplicas(1)


Mime
View raw message