spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yh...@apache.org
Subject [1/2] spark git commit: [SPARK-6908] [SQL] Use isolated Hive client
Date Fri, 08 May 2015 02:36:44 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 2e8a141b5 -> 05454fd8a


http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index ea52fea..6bca9d0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.client
 
 import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
 import java.net.URI
-import java.util.{ArrayList => JArrayList}
+import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set =>
JSet}
 
 import scala.collection.JavaConversions._
 import scala.language.reflectiveCalls
@@ -27,6 +27,7 @@ import scala.language.reflectiveCalls
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.metastore.api.Database
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.TableType
 import org.apache.hadoop.hive.metastore.api
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata
@@ -54,19 +55,13 @@ import org.apache.spark.sql.execution.QueryExecutionException
  * @param config  a collection of configuration options that will be added to the hive conf
before
  *                opening the hive client.
  */
-class ClientWrapper(
+private[hive] class ClientWrapper(
     version: HiveVersion,
     config: Map[String, String])
   extends ClientInterface
   with Logging
   with ReflectionMagic {
 
-  private val conf = new HiveConf(classOf[SessionState])
-  config.foreach { case (k, v) =>
-    logDebug(s"Hive Config: $k=$v")
-    conf.set(k, v)
-  }
-
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures
occur.
   private val outputBuffer = new java.io.OutputStream {
     var pos: Int = 0
@@ -99,17 +94,31 @@ class ClientWrapper(
     val original = Thread.currentThread().getContextClassLoader
     Thread.currentThread().setContextClassLoader(getClass.getClassLoader)
     val ret = try {
-      val newState = new SessionState(conf)
-      SessionState.start(newState)
-      newState.out = new PrintStream(outputBuffer, true, "UTF-8")
-      newState.err = new PrintStream(outputBuffer, true, "UTF-8")
-      newState
+      val oldState = SessionState.get()
+      if (oldState == null) {
+        val initialConf = new HiveConf(classOf[SessionState])
+        config.foreach { case (k, v) =>
+          logDebug(s"Hive Config: $k=$v")
+          initialConf.set(k, v)
+        }
+        val newState = new SessionState(initialConf)
+        SessionState.start(newState)
+        newState.out = new PrintStream(outputBuffer, true, "UTF-8")
+        newState.err = new PrintStream(outputBuffer, true, "UTF-8")
+        newState
+      } else {
+        oldState
+      }
     } finally {
       Thread.currentThread().setContextClassLoader(original)
     }
     ret
   }
 
+  /** Returns the configuration for the current session. */
+  def conf: HiveConf = SessionState.get().getConf
+
+  // TODO: should be a def?s
   private val client = Hive.get(conf)
 
   /**
@@ -133,6 +142,18 @@ class ClientWrapper(
     ret
   }
 
+  def setOut(stream: PrintStream): Unit = withHiveState {
+    state.out = stream
+  }
+
+  def setInfo(stream: PrintStream): Unit = withHiveState {
+    state.info = stream
+  }
+
+  def setError(stream: PrintStream): Unit = withHiveState {
+    state.err = stream
+  }
+
   override def currentDatabase: String = withHiveState {
     state.getCurrentDatabase
   }
@@ -171,14 +192,20 @@ class ClientWrapper(
         partitionColumns = h.getPartCols.map(f => HiveColumn(f.getName, f.getType, f.getComment)),
         properties = h.getParameters.toMap,
         serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.toMap,
-        tableType = ManagedTable, // TODO
+        tableType = h.getTableType match {
+          case TableType.MANAGED_TABLE => ManagedTable
+          case TableType.EXTERNAL_TABLE => ExternalTable
+          case TableType.VIRTUAL_VIEW => VirtualView
+          case TableType.INDEX_TABLE => IndexTable
+        },
         location = version match {
           case hive.v12 => Option(h.call[URI]("getDataLocation")).map(_.toString)
           case hive.v13 => Option(h.call[Path]("getDataLocation")).map(_.toString)
         },
         inputFormat = Option(h.getInputFormatClass).map(_.getName),
         outputFormat = Option(h.getOutputFormatClass).map(_.getName),
-        serde = Option(h.getSerializationLib)).withClient(this)
+        serde = Option(h.getSerializationLib),
+        viewText = Option(h.getViewExpandedText)).withClient(this)
     }
     converted
   }
@@ -223,27 +250,40 @@ class ClientWrapper(
     client.alterTable(table.qualifiedName, qlTable)
   }
 
+  private def toHivePartition(partition: metadata.Partition): HivePartition = {
+    val apiPartition = partition.getTPartition
+    HivePartition(
+      values = Option(apiPartition.getValues).map(_.toSeq).getOrElse(Seq.empty),
+      storage = HiveStorageDescriptor(
+        location = apiPartition.getSd.getLocation,
+        inputFormat = apiPartition.getSd.getInputFormat,
+        outputFormat = apiPartition.getSd.getOutputFormat,
+        serde = apiPartition.getSd.getSerdeInfo.getSerializationLib,
+        serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.toMap))
+  }
+
+  override def getPartitionOption(
+      table: HiveTable,
+      partitionSpec: JMap[String, String]): Option[HivePartition] = withHiveState {
+
+    val qlTable = toQlTable(table)
+    val qlPartition = client.getPartition(qlTable, partitionSpec, false)
+    Option(qlPartition).map(toHivePartition)
+  }
+
   override def getAllPartitions(hTable: HiveTable): Seq[HivePartition] = withHiveState {
     val qlTable = toQlTable(hTable)
     val qlPartitions = version match {
       case hive.v12 =>
-        client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsForPruner",
qlTable)
+        client.call[metadata.Table, JSet[metadata.Partition]]("getAllPartitionsForPruner",
qlTable)
       case hive.v13 =>
-        client.call[metadata.Table, Set[metadata.Partition]]("getAllPartitionsOf", qlTable)
+        client.call[metadata.Table, JSet[metadata.Partition]]("getAllPartitionsOf", qlTable)
     }
-    qlPartitions.map(_.getTPartition).map { p =>
-      HivePartition(
-        values = Option(p.getValues).map(_.toSeq).getOrElse(Seq.empty),
-        storage = HiveStorageDescriptor(
-          location = p.getSd.getLocation,
-          inputFormat = p.getSd.getInputFormat,
-          outputFormat = p.getSd.getOutputFormat,
-          serde = p.getSd.getSerdeInfo.getSerializationLib))
-    }.toSeq
+    qlPartitions.toSeq.map(toHivePartition)
   }
 
   override def listTables(dbName: String): Seq[String] = withHiveState {
-    client.getAllTables
+    client.getAllTables(dbName)
   }
 
   /**
@@ -267,11 +307,12 @@ class ClientWrapper(
     try {
       val cmd_trimmed: String = cmd.trim()
       val tokens: Array[String] = cmd_trimmed.split("\\s+")
+      // The remainder of the command.
       val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
       val proc: CommandProcessor = version match {
         case hive.v12 =>
           classOf[CommandProcessorFactory]
-            .callStatic[String, HiveConf, CommandProcessor]("get", cmd_1, conf)
+            .callStatic[String, HiveConf, CommandProcessor]("get", tokens(0), conf)
         case hive.v13 =>
           classOf[CommandProcessorFactory]
             .callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)),
conf)
@@ -294,7 +335,7 @@ class ClientWrapper(
               res.toSeq
             case hive.v13 =>
               val res = new JArrayList[Object]
-              driver.call[JArrayList[Object], Boolean]("getResults", res)
+              driver.call[JList[Object], Boolean]("getResults", res)
               res.map { r =>
                 r match {
                   case s: String => s

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 710dbca..7f94c93 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
 import java.util
 
 import scala.language.reflectiveCalls
@@ -30,9 +30,10 @@ import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkSubmitUtils
 
 import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.hive.HiveContext
 
 /** Factory for `IsolatedClientLoader` with specific versions of hive. */
-object IsolatedClientLoader {
+private[hive] object IsolatedClientLoader {
   /**
    * Creates isolated Hive client loaders by downloading the requested version from maven.
    */
@@ -49,7 +50,7 @@ object IsolatedClientLoader {
     case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
   }
 
-  private def downloadVersion(version: HiveVersion): Seq[File] = {
+  private def downloadVersion(version: HiveVersion): Seq[URL] = {
     val hiveArtifacts =
       (Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++
         (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
@@ -72,10 +73,10 @@ object IsolatedClientLoader {
     tempDir.mkdir()
 
     allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir))
-    tempDir.listFiles()
+    tempDir.listFiles().map(_.toURL)
   }
 
-  private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]]
+  private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
 }
 
 /**
@@ -99,9 +100,9 @@ object IsolatedClientLoader {
  * @param baseClassLoader The spark classloader that is used to load shared classes.
  *
  */
-class IsolatedClientLoader(
+private[hive] class IsolatedClientLoader(
     val version: HiveVersion,
-    val execJars: Seq[File] = Seq.empty,
+    val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
     val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
@@ -112,7 +113,7 @@ class IsolatedClientLoader(
   assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure)
 
   /** All jars used by the hive specific classloader. */
-  protected def allJars = execJars.map(_.toURI.toURL).toArray
+  protected def allJars = execJars.toArray
 
   protected def isSharedClass(name: String): Boolean =
     name.contains("slf4j") ||
@@ -166,6 +167,12 @@ class IsolatedClientLoader(
       .getConstructors.head
       .newInstance(version, config)
       .asInstanceOf[ClientInterface]
+  } catch {
+    case ReflectionException(cnf: NoClassDefFoundError) =>
+      throw new ClassNotFoundException(
+        s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
+         "Please make sure that jars for your version of hive and hadoop are included in
the " +
+        s"paths passed to ${HiveContext.HIVE_METASTORE_JARS}.")
   } finally {
     Thread.currentThread.setContextClassLoader(baseClassLoader)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala
index 90d0304..c600b15 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ReflectionMagic.scala
@@ -19,6 +19,14 @@ package org.apache.spark.sql.hive.client
 
 import scala.reflect._
 
+/** Unwraps reflection exceptions. */
+private[client] object ReflectionException {
+  def unapply(a: Throwable): Option[Throwable] = a match {
+    case ite: java.lang.reflect.InvocationTargetException => Option(ite.getCause)
+    case _ => None
+  }
+}
+
 /**
  * Provides implicit functions on any object for calling methods reflectively.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 76a1965..91e6ac4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.MetastoreRelation
+import org.apache.spark.sql.hive.client.{HiveTable, HiveColumn}
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, HiveMetastoreTypes}
 
 /**
  * Create table and insert the query result into it.
@@ -39,17 +39,34 @@ import org.apache.spark.sql.hive.MetastoreRelation
  */
 private[hive]
 case class CreateTableAsSelect(
-    database: String,
-    tableName: String,
+    tableDesc: HiveTable,
     query: LogicalPlan,
-    allowExisting: Boolean,
-    desc: Option[CreateTableDesc]) extends RunnableCommand {
+    allowExisting: Boolean)
+  extends RunnableCommand {
+
+  def database: String = tableDesc.database
+  def tableName: String = tableDesc.name
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
     lazy val metastoreRelation: MetastoreRelation = {
-      // Create Hive Table
-      hiveContext.catalog.createTable(database, tableName, query.output, allowExisting, desc)
+      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+      import org.apache.hadoop.io.Text
+      import org.apache.hadoop.mapred.TextInputFormat
+
+      val withSchema =
+        tableDesc.copy(
+          schema =
+            query.output.map(c =>
+              HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)),
+          inputFormat =
+            tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
+          outputFormat =
+            tableDesc.outputFormat
+              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
+          serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
+      hiveContext.catalog.client.createTable(withSchema)
 
       // Get the Metastore Relation
       hiveContext.catalog.lookupRelation(Seq(database, tableName), None) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 89995a9..de8954d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -200,9 +200,7 @@ case class InsertIntoHiveTable(
           orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
       }
       val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
-      catalog.synchronized {
-        catalog.client.validatePartitionNameCharacters(partVals)
-      }
+
       // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
       // which is currently considered as a Hive native command.
       val inheritTableSpecs = true
@@ -211,7 +209,7 @@ case class InsertIntoHiveTable(
       if (numDynamicPartitions > 0) {
         catalog.synchronized {
           catalog.client.loadDynamicPartitions(
-            outputPath,
+            outputPath.toString,
             qualifiedTableName,
             orderedPartitionSpec,
             overwrite,
@@ -224,31 +222,28 @@ case class InsertIntoHiveTable(
         // ifNotExists is only valid with static partition, refer to
         // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
         // scalastyle:on
-        val oldPart = catalog.synchronized {
-          catalog.client.getPartition(
-            catalog.client.getTable(qualifiedTableName), partitionSpec, false)
-        }
-        if (oldPart == null || !ifNotExists) {
-          catalog.synchronized {
+        val oldPart =
+          catalog.client.getPartitionOption(
+            catalog.client.getTable(table.databaseName, table.tableName),
+            partitionSpec)
+
+        if (oldPart.isEmpty || !ifNotExists) {
             catalog.client.loadPartition(
-              outputPath,
+              outputPath.toString,
               qualifiedTableName,
               orderedPartitionSpec,
               overwrite,
               holdDDLTime,
               inheritTableSpecs,
               isSkewedStoreAsSubdir)
-          }
         }
       }
     } else {
-      catalog.synchronized {
-        catalog.client.loadTable(
-          outputPath,
-          qualifiedTableName,
-          overwrite,
-          holdDDLTime)
-      }
+      catalog.client.loadTable(
+        outputPath.toString, // TODO: URI
+        qualifiedTableName,
+        overwrite,
+        holdDDLTime)
     }
 
     // Invalidate the cache.

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index a40a1e5..abab1a2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * Analyzes the given table in the current database to generate statistics, which will be
@@ -84,8 +85,20 @@ case class AddJar(path: String) extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
+    val currentClassLoader = Utils.getContextOrSparkClassLoader
+
+    // Add jar to current context
+    val jarURL = new java.io.File(path).toURL
+    val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader)
+    Thread.currentThread.setContextClassLoader(newClassLoader)
+    org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader)
+
+    // Add jar to isolated hive classloader
     hiveContext.runSqlHive(s"ADD JAR $path")
+
+    // Add jar to executors
     hiveContext.sparkContext.addJar(path)
+
     Seq(Row(0))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ca84b43..1f40a53 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.test
 import java.io.File
 import java.util.{Set => JavaSet}
 
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry
 import org.apache.hadoop.hive.ql.io.avro.{AvroContainerInputFormat, AvroContainerOutputFormat}
 import org.apache.hadoop.hive.ql.metadata.Table
@@ -62,6 +63,8 @@ object TestHive
 class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   self =>
 
+  import HiveContext._
+
   // By clearing the port we force Spark to pick a new one.  This allows us to rerun tests
   // without restarting the JVM.
   System.clearProperty("spark.hostPort")
@@ -70,24 +73,16 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   hiveconf.set("hive.plan.serialization.format", "javaXML")
 
   lazy val warehousePath = Utils.createTempDir()
-  lazy val metastorePath = Utils.createTempDir()
 
   /** Sets up the system initially or after a RESET command */
-  protected def configure(): Unit = {
-    warehousePath.delete()
-    metastorePath.delete()
-    setConf("javax.jdo.option.ConnectionURL",
-      s"jdbc:derby:;databaseName=$metastorePath;create=true")
-    setConf("hive.metastore.warehouse.dir", warehousePath.toString)
-  }
+  protected override def configure(): Map[String, String] =
+   newTemporaryConfiguration() ++ Map("hive.metastore.warehouse.dir" -> warehousePath.toString)
 
   val testTempDir = Utils.createTempDir()
 
   // For some hive test case which contain ${system:test.tmp.dir}
   System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
 
-  configure() // Must be called before initializing the catalog below.
-
   /** The location of the compiled hive distribution */
   lazy val hiveHome = envVarToFile("HIVE_HOME")
   /** The location of the hive source code. */
@@ -195,6 +190,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
    * A list of test tables and the DDL required to initialize them.  A test table is loaded
on
    * demand when a query are run against it.
    */
+  @transient
   lazy val testTables = new mutable.HashMap[String, TestTable]()
 
   def registerTestTable(testTable: TestTable): Unit = {
@@ -204,6 +200,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   // The test tables that are defined in the Hive QTestUtil.
   // /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
   // https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
+  @transient
   val hiveQTestUtilTables = Seq(
     TestTable("src",
       "CREATE TABLE src (key INT, value STRING)".cmd,
@@ -236,16 +233,18 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       import org.apache.hadoop.mapred.{SequenceFileInputFormat, SequenceFileOutputFormat}
       import org.apache.thrift.protocol.TBinaryProtocol
 
-      val srcThrift = new Table("default", "src_thrift")
-      srcThrift.setFields(Nil)
-      srcThrift.setInputFormatClass(classOf[SequenceFileInputFormat[_,_]].getName)
-      // In Hive, SequenceFileOutputFormat will be substituted by HiveSequenceFileOutputFormat.
-      srcThrift.setOutputFormatClass(classOf[SequenceFileOutputFormat[_,_]].getName)
-      srcThrift.setSerializationLib(classOf[ThriftDeserializer].getName)
-      srcThrift.setSerdeParam("serialization.class", classOf[Complex].getName)
-      srcThrift.setSerdeParam("serialization.format", classOf[TBinaryProtocol].getName)
-      catalog.client.createTable(srcThrift)
-
+      runSqlHive(
+        s"""
+         |CREATE TABLE src_thrift(fake INT)
+         |ROW FORMAT SERDE '${classOf[ThriftDeserializer].getName}'
+         |WITH SERDEPROPERTIES(
+         |  'serialization.class'='${classOf[Complex].getName}',
+         |  'serialization.format'='${classOf[TBinaryProtocol].getName}'
+         |)
+         |STORED AS
+         |INPUTFORMAT '${classOf[SequenceFileInputFormat[_,_]].getName}'
+         |OUTPUTFORMAT '${classOf[SequenceFileOutputFormat[_,_]].getName}'
+        """.stripMargin)
 
       runSqlHive(
         s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/complex.seq")}' INTO TABLE src_thrift")
@@ -367,7 +366,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     if (!(loadedTables contains name)) {
       // Marks the table as loaded first to prevent infinite mutually recursive table loading.
       loadedTables += name
-      logInfo(s"Loading test table $name")
+      logDebug(s"Loading test table $name")
       val createCmds =
         testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
       createCmds.foreach(_())
@@ -384,9 +383,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
    */
   protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames
 
-  // Database default may not exist in 0.13.1, create it if not exist
-  HiveShim.createDefaultDBIfNeeded(this)
-
   /**
    * Resets the test instance by deleting any tables that have been created.
    * TODO: also clear out UDFs, views, etc.
@@ -401,24 +397,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       cacheManager.clearCache()
       loadedTables.clear()
       catalog.cachedDataSourceTables.invalidateAll()
-      catalog.client.getAllTables("default").foreach { t =>
-        logDebug(s"Deleting table $t")
-        val table = catalog.client.getTable("default", t)
-
-        catalog.client.getIndexes("default", t, 255).foreach { index =>
-          catalog.client.dropIndex("default", t, index.getIndexName, true)
-        }
-
-        if (!table.isIndexTable) {
-          catalog.client.dropTable("default", t)
-        }
-      }
-
-      catalog.client.getAllDatabases.filterNot(_ == "default").foreach { db =>
-        logDebug(s"Dropping Database: $db")
-        catalog.client.dropDatabase(db, true, false, true)
-      }
-
+      catalog.client.reset()
       catalog.unregisterAllTables()
 
       FunctionRegistry.getFunctionNames.filterNot(originalUdfs.contains(_)).foreach { udfName
=>
@@ -429,7 +408,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       hiveconf.set("fs.default.name", new File(".").toURI.toString)
       // It is important that we RESET first as broken hooks that might have been set could
break
       // other sql exec here.
-      runSqlHive("RESET")
+      executionHive.runSqlHive("RESET")
+      metadataHive.runSqlHive("RESET")
       // For some reason, RESET does not reset the following variables...
       // https://issues.apache.org/jira/browse/HIVE-9004
       runSqlHive("set hive.table.parameters.default=")
@@ -437,7 +417,11 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
       runSqlHive("set datanucleus.cache.collections.lazy=true")
       // Lots of tests fail if we do not change the partition whitelist from the default.
       runSqlHive("set hive.metastore.partition.name.whitelist.pattern=.*")
-      configure()
+
+      configure().foreach {
+        case (k, v) =>
+          metadataHive.runSqlHive(s"SET $k=$v")
+      }
 
       runSqlHive("USE default")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index 5bc0806..92eaf1f 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -33,7 +33,7 @@ log4j.appender.FA.layout=org.apache.log4j.PatternLayout
 log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
 
 # Set the logger level of File Appender to WARN
-log4j.appender.FA.Threshold = INFO
+log4j.appender.FA.Threshold = DEBUG
 
 # Some packages are noisy for no good reason.
 log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index d960a30..30f5313 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.{OutputStream, PrintStream}
-
 import scala.util.Try
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.{AnalysisException, QueryTest}
@@ -109,25 +108,6 @@ class ErrorPositionSuite extends QueryTest with BeforeAndAfter {
       "SELECT 1 + array(1)", "1 + array")
   }
 
-  /** Hive can be very noisy, messing up the output of our tests. */
-  private def quietly[A](f: => A): A = {
-    val origErr = System.err
-    val origOut = System.out
-    try {
-      System.setErr(new PrintStream(new OutputStream {
-        def write(b: Int) = {}
-      }))
-      System.setOut(new PrintStream(new OutputStream {
-        def write(b: Int) = {}
-      }))
-
-      f
-    } finally {
-      System.setErr(origErr)
-      System.setOut(origOut)
-    }
-  }
-
   /**
    * Creates a test that checks to see if the error thrown when analyzing a given query includes
    * the location of the given token in the query string.

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0538aa2..47c60f6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.InvalidInputException
 import org.apache.spark.sql._
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.ParquetRelation2
@@ -686,16 +687,21 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach
{
   test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") {
     val tableName = "spark6655"
     val schema = StructType(StructField("int", IntegerType, true) :: Nil)
-    // Manually create the metadata in metastore.
-    val tbl = new Table("default", tableName)
-    tbl.setProperty("spark.sql.sources.provider", "json")
-    tbl.setProperty("spark.sql.sources.schema", schema.json)
-    tbl.setProperty("EXTERNAL", "FALSE")
-    tbl.setTableType(TableType.MANAGED_TABLE)
-    tbl.setSerdeParam("path", catalog.hiveDefaultTableFilePath(tableName))
-    catalog.synchronized {
-      catalog.client.createTable(tbl)
-    }
+
+    val hiveTable = HiveTable(
+      specifiedDatabase = Some("default"),
+      name = tableName,
+      schema = Seq.empty,
+      partitionColumns = Seq.empty,
+      properties = Map(
+        "spark.sql.sources.provider" -> "json",
+        "spark.sql.sources.schema" -> schema.json,
+        "EXTERNAL" -> "FALSE"),
+      tableType = ManagedTable,
+      serdeProperties = Map(
+        "path" -> catalog.hiveDefaultTableFilePath(tableName)))
+
+    catalog.client.createTable(hiveTable)
 
     invalidateTable(tableName)
     val actualSchema = table(tableName).schema

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
index d6ddd53..8afe545 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
@@ -26,8 +26,10 @@ import org.apache.spark.sql.hive.test.TestHive
 class SerializationSuite extends FunSuite {
 
   test("[SPARK-5840] HiveContext should be serializable") {
-    val hiveContext = new HiveContext(TestHive.sparkContext)
+    val hiveContext = TestHive
     hiveContext.hiveconf
-    new JavaSerializer(new SparkConf()).newInstance().serialize(hiveContext)
+    val serializer = new JavaSerializer(new SparkConf()).newInstance()
+    val bytes = serializer.serialize(hiveContext)
+    val deSer = serializer.deserialize[AnyRef](bytes)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 81e77ba..321dc8d73 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -22,9 +22,13 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.util.Utils
 import org.scalatest.FunSuite
 
+/**
+ * A simple set of tests that call the methods of a hive ClientInterface, loading different
version 
+ * of hive from maven central.  These tests are simple in that they are mostly just testing
to make 
+ * sure that reflective calls are not throwing NoSuchMethod error, but the actually functionallity

+ * is not fully tested.
+ */
 class VersionsSuite extends FunSuite with Logging {
-  val testType = "derby"
-
   private def buildConf() = {
     lazy val warehousePath = Utils.createTempDir()
     lazy val metastorePath = Utils.createTempDir()
@@ -50,6 +54,14 @@ class VersionsSuite extends FunSuite with Logging {
     causes
   }
 
+  private val emptyDir = Utils.createTempDir().getCanonicalPath
+
+  private def partSpec = {
+    val hashMap = new java.util.LinkedHashMap[String, String]
+    hashMap.put("key", "1")
+    hashMap
+  }
+
   // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally
   // connecting to an auto-populated, in-process metastore.  Let's make sure we are getting
the
   // versions right by forcing a known compatibility failure.
@@ -66,10 +78,9 @@ class VersionsSuite extends FunSuite with Logging {
   private var client: ClientInterface = null
 
   versions.foreach { version =>
-    test(s"$version: listTables") {
+    test(s"$version: create client") {
       client = null
       client = IsolatedClientLoader.forVersion(version, buildConf()).client
-      client.listTables("default")
     }
 
     test(s"$version: createDatabase") {
@@ -101,5 +112,64 @@ class VersionsSuite extends FunSuite with Logging {
     test(s"$version: getTable") {
       client.getTable("default", "src")
     }
+
+    test(s"$version: listTables") {
+      assert(client.listTables("default") === Seq("src"))
+    }
+
+    test(s"$version: currentDatabase") {
+      assert(client.currentDatabase === "default")
+    }
+
+    test(s"$version: getDatabase") {
+      client.getDatabase("default")
+    }
+
+    test(s"$version: alterTable") {
+      client.alterTable(client.getTable("default", "src"))
+    }
+
+    test(s"$version: set command") {
+      client.runSqlHive("SET spark.sql.test.key=1")
+    }
+
+    test(s"$version: create partitioned table DDL") {
+      client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key INT)")
+      client.runSqlHive("ALTER TABLE src_part ADD PARTITION (key = '1')")
+    }
+
+    test(s"$version: getPartitions") {
+      client.getAllPartitions(client.getTable("default", "src_part"))
+    }
+
+    test(s"$version: loadPartition") {
+      client.loadPartition(
+        emptyDir,
+        "default.src_part",
+        partSpec,
+        false,
+        false,
+        false,
+        false)
+    }
+
+    test(s"$version: loadTable") {
+      client.loadTable(
+        emptyDir,
+        "src",
+        false,
+        false)
+    }
+
+    test(s"$version: loadDynamicPartitions") {
+      client.loadDynamicPartitions(
+        emptyDir,
+        "default.src_part",
+        partSpec,
+        false,
+        1,
+        false,
+        false)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index a3eacbd..9c056e4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -300,6 +300,8 @@ abstract class HiveComparisonTest
 
             val hiveQueries = queryList.map(new TestHive.QueryExecution(_))
             // Make sure we can at least parse everything before attempting hive execution.
+            // Note this must only look at the logical plan as we might not be able to analyze
if
+            // other DDL has not been executed yet.
             hiveQueries.foreach(_.logical)
             val computedResults = (queryList.zipWithIndex, hiveQueries, hiveCacheFiles).zipped.map
{
               case ((queryString, i), hiveQuery, cachedAnswerFile)=>

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index ac10b17..7d728fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -900,7 +900,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       |DROP TABLE IF EXISTS dynamic_part_table;
     """.stripMargin)
 
-  test("Dynamic partition folder layout") {
+  ignore("Dynamic partition folder layout") {
     sql("DROP TABLE IF EXISTS dynamic_part_table")
     sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2
INT)")
     sql("SET hive.exec.dynamic.partition.mode=nonstrict")

http://git-wip-us.apache.org/repos/asf/spark/blob/05454fd8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index 45f10e2..de6a41c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -150,20 +150,21 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
       val (actualScannedColumns, actualPartValues) = plan.collect {
         case p @ HiveTableScan(columns, relation, _) =>
           val columnNames = columns.map(_.name)
-          val partValues = p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
+          val partValues = if (relation.table.isPartitioned) {
+            p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
+          } else {
+            Seq.empty
+          }
           (columnNames, partValues)
       }.head
 
       assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch")
       assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch")
 
-      assert(
-        actualPartValues.length === expectedPartValues.length,
-        "Partition value count mismatches")
+      val actualPartitions = actualPartValues.map(_.toSeq.mkString(",")).sorted
+      val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted
 
-      for ((actual, expected) <- actualPartValues.zip(expectedPartValues)) {
-        assert(actual sameElements expected, "Partition values mismatch")
-      }
+      assert(actualPartitions === expectedPartitions, "Partitions selected do not match")
     }
 
     // Creates a query test to compare query results generated by Hive and Catalyst.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message