spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/2] git commit: [SQL][SPARK-2094] Follow up of PR #1071 for Java API
Date Mon, 16 Jun 2014 23:48:15 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d7f94b934 -> 235cfd0f6


[SQL][SPARK-2094] Follow up of PR #1071 for Java API

Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext`
and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1085 from liancheng/spark-2094-java and squashes the following commits:

29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround
test failure
92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore"
22aec97 [Cheng Lian] Follow up of PR #1071 for Java API

(cherry picked from commit 273afcb254fb5384204c56bdcb3b9b760bcfab3f)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c675de9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c675de9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c675de9

Branch: refs/heads/branch-1.0
Commit: 9c675de992d29327436edf7143dba3e4ef5601a8
Parents: d7f94b9
Author: Cheng Lian <lian.cs.zju@gmail.com>
Authored: Mon Jun 16 21:30:29 2014 +0200
Committer: Reynold Xin <rxin@apache.org>
Committed: Mon Jun 16 16:47:50 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/api/java/JavaSQLContext.scala     |  16 +--
 .../sql/hive/api/java/JavaHiveContext.scala     |  10 +-
 .../sql/hive/api/java/JavaHiveQLSuite.scala     | 101 +++++++++++++++++++
 .../spark/sql/hive/api/java/JavaHiveSuite.scala |  41 --------
 .../sql/hive/execution/HiveQuerySuite.scala     |  30 +++---
 5 files changed, 124 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c675de9/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 6f7d431..352260f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -40,19 +40,13 @@ class JavaSQLContext(val sqlContext: SQLContext) {
   /**
    * Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
    */
-  def sql(sqlQuery: String): JavaSchemaRDD = {
-    val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
-    // We force query optimization to happen right away instead of letting it happen lazily
like
-    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but do not perform any execution.
-    result.queryExecution.toRdd
-    result
-  }
+  def sql(sqlQuery: String): JavaSchemaRDD =
+    new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
 
   /**
    * :: Experimental ::
    * Creates an empty parquet file with the schema of class `beanClass`, which can be registered
as
-   * a table. This registered table can be used as the target of future insertInto` operations.
+   * a table. This registered table can be used as the target of future `insertInto` operations.
    *
    * {{{
    *   JavaSQLContext sqlCtx = new JavaSQLContext(...)
@@ -62,7 +56,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
    * }}}
    *
    * @param beanClass A java bean class object that will be used to determine the schema
of the
-   *                  parquet file.                          s
+   *                  parquet file.
    * @param path The path where the directory containing parquet metadata should be created.
    *             Data inserted into this table will also be stored at this location.
    * @param allowExisting When false, an exception will be thrown if this directory already
exists.
@@ -100,14 +94,12 @@ class JavaSQLContext(val sqlContext: SQLContext) {
     new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
   }
 
-
   /**
    * Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
    */
   def parquetFile(path: String): JavaSchemaRDD =
     new JavaSchemaRDD(sqlContext, ParquetRelation(path))
 
-
   /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist
only
    * during the lifetime of this instance of SQLContext.

http://git-wip-us.apache.org/repos/asf/spark/blob/9c675de9/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
index 6df76fa..c9ee162 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
@@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
   /**
     * Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
     */
-  def hql(hqlQuery: String): JavaSchemaRDD = {
-    val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
-    // We force query optimization to happen right away instead of letting it happen lazily
like
-    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but do not perform any execution.
-    result.queryExecution.toRdd
-    result
-  }
+  def hql(hqlQuery: String): JavaSchemaRDD =
+    new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c675de9/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
new file mode 100644
index 0000000..3b9cd8f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.api.java
+
+import scala.util.Try
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.api.java.JavaSchemaRDD
+import org.apache.spark.sql.execution.ExplainCommand
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.test.TestSQLContext
+
+// Implicits
+import scala.collection.JavaConversions._
+
+class JavaHiveQLSuite extends FunSuite {
+  lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
+
+  // There is a little trickery here to avoid instantiating two HiveContexts in the same
JVM
+  lazy val javaHiveCtx = new JavaHiveContext(javaCtx) {
+    override val sqlContext = TestHive
+  }
+
+  ignore("SELECT * FROM src") {
+    assert(
+      javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
+        TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
+  }
+
+  private val explainCommandClassName =
+    classOf[ExplainCommand].getSimpleName.stripSuffix("$")
+
+  def isExplanation(result: JavaSchemaRDD) = {
+    val explanation = result.collect().map(_.getString(0))
+    explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
+  }
+
+  ignore("Query Hive native command execution result") {
+    val tableName = "test_native_commands"
+
+    assertResult(0) {
+      javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count()
+    }
+
+    assertResult(0) {
+      javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
+    }
+
+    javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables")
+
+    assert(
+      javaHiveCtx
+        .hql("SELECT result FROM show_tables")
+        .collect()
+        .map(_.getString(0))
+        .contains(tableName))
+
+    assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
+      javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table")
+
+      javaHiveCtx
+        .hql("SELECT result FROM describe_table")
+        .collect()
+        .map(_.getString(0).split("\t").map(_.trim))
+        .toArray
+    }
+
+    assert(isExplanation(javaHiveCtx.hql(
+      s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
+
+    TestHive.reset()
+  }
+
+  ignore("Exactly once semantics for DDL and command statements") {
+    val tableName = "test_exactly_once"
+    val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)")
+
+    // If the table was not created, the following assertion would fail
+    assert(Try(TestHive.table(tableName)).isSuccess)
+
+    // If the CREATE TABLE command got executed again, the following assertion would fail
+    assert(Try(q0.count()).isSuccess)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c675de9/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
deleted file mode 100644
index 9c5d7c8..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.api.java
-
-import org.scalatest.FunSuite
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.hive.test.TestHive
-
-// Implicits
-import scala.collection.JavaConversions._
-
-class JavaHiveSQLSuite extends FunSuite {
-  ignore("SELECT * FROM src") {
-    val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
-    // There is a little trickery here to avoid instantiating two HiveContexts in the same
JVM
-    val javaSqlCtx = new JavaHiveContext(javaCtx) {
-      override val sqlContext = TestHive
-    }
-
-    assert(
-      javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
-        TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9c675de9/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 0d656c5..6e8d11b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest {
   test("Query Hive native command execution result") {
     val tableName = "test_native_commands"
 
-    val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
-    assert(q0.count() == 0)
+    assertResult(0) {
+      hql(s"DROP TABLE IF EXISTS $tableName").count()
+    }
 
-    val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
-    assert(q1.count() == 0)
+    assertResult(0) {
+      hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
+    }
 
-    val q2 = hql("SHOW TABLES")
-    val tables = q2.select('result).collect().map { case Row(table: String) => table }
-    assert(tables.contains(tableName))
+    assert(
+      hql("SHOW TABLES")
+        .select('result)
+        .collect()
+        .map(_.getString(0))
+        .contains(tableName))
 
-    val q3 = hql(s"DESCRIBE $tableName")
     assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
-      q3.select('result).collect().map { case Row(fieldDesc: String) =>
-        fieldDesc.split("\t").map(_.trim)
-      }
+      hql(s"DESCRIBE $tableName")
+        .select('result)
+        .collect()
+        .map(_.getString(0).split("\t").map(_.trim))
     }
 
-    val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
-    assert(isExplanation(q4))
+    assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))
 
     TestHive.reset()
   }


Mime
View raw message