spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [2/2] git commit: [SPARK-2410][SQL] Merging Hive Thrift/JDBC server
Date Sun, 27 Jul 2014 20:03:46 GMT
[SPARK-2410][SQL] Merging Hive Thrift/JDBC server

(This is a replacement of #1399, trying to fix potential `HiveThriftServer2` port collision between parallel builds. Please refer to [these comments](https://github.com/apache/spark/pull/1399#issuecomment-50212572) for details.)

JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)

Merging the Hive Thrift/JDBC server from [branch-1.0-jdbc](https://github.com/apache/spark/tree/branch-1.0-jdbc).

Thanks chenghao-intel for his initial contribution of the Spark SQL CLI.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1600 from liancheng/jdbc and squashes the following commits:

ac4618b [Cheng Lian] Uses random port for HiveThriftServer2 to avoid collision with parallel builds
090beea [Cheng Lian] Revert changes related to SPARK-2678, decided to move them to another PR
21c6cf4 [Cheng Lian] Updated Spark SQL programming guide docs
fe0af31 [Cheng Lian] Reordered spark-submit options in spark-shell[.cmd]
199e3fb [Cheng Lian] Disabled MIMA for hive-thriftserver
1083e9d [Cheng Lian] Fixed failed test suites
7db82a1 [Cheng Lian] Fixed spark-submit application options handling logic
9cc0f06 [Cheng Lian] Starts beeline with spark-submit
cfcf461 [Cheng Lian] Updated documents and build scripts for the newly added hive-thriftserver profile
061880f [Cheng Lian] Addressed all comments by @pwendell
7755062 [Cheng Lian] Adapts test suites to spark-submit settings
40bafef [Cheng Lian] Fixed more license header issues
e214aab [Cheng Lian] Added missing license headers
b8905ba [Cheng Lian] Fixed minor issues in spark-sql and start-thriftserver.sh
f975d22 [Cheng Lian] Updated docs for Hive compatibility and Shark migration guide draft
3ad4e75 [Cheng Lian] Starts spark-sql shell with spark-submit
a5310d1 [Cheng Lian] Make HiveThriftServer2 play well with spark-submit
61f39f4 [Cheng Lian] Starts Hive Thrift server via spark-submit
2c4c539 [Cheng Lian] Cherry picked the Hive Thrift server


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6ff2a61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6ff2a61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6ff2a61

Branch: refs/heads/master
Commit: f6ff2a61d00d12481bfb211ae13d6992daacdcc2
Parents: 2bbf235
Author: Cheng Lian <lian.cs.zju@gmail.com>
Authored: Sun Jul 27 13:03:38 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Sun Jul 27 13:03:38 2014 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 assembly/pom.xml                                |  10 +
 bagel/pom.xml                                   |   2 +-
 bin/beeline                                     |  45 +++
 bin/compute-classpath.sh                        |   1 +
 bin/spark-shell                                 |   4 +-
 bin/spark-shell.cmd                             |   2 +-
 bin/spark-sql                                   |  36 ++
 core/pom.xml                                    |   2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |  14 +-
 .../spark/deploy/SparkSubmitArguments.scala     |   5 +-
 dev/create-release/create-release.sh            |  10 +-
 dev/run-tests                                   |   2 +-
 dev/scalastyle                                  |   2 +-
 docs/sql-programming-guide.md                   | 201 ++++++++++-
 examples/pom.xml                                |   2 +-
 external/flume/pom.xml                          |   2 +-
 external/kafka/pom.xml                          |   2 +-
 external/mqtt/pom.xml                           |   2 +-
 external/twitter/pom.xml                        |   2 +-
 external/zeromq/pom.xml                         |   2 +-
 graphx/pom.xml                                  |   2 +-
 mllib/pom.xml                                   |   2 +-
 pom.xml                                         |   7 +-
 project/SparkBuild.scala                        |  14 +-
 sbin/start-thriftserver.sh                      |  36 ++
 sql/catalyst/pom.xml                            |   2 +-
 .../sql/catalyst/plans/logical/commands.scala   |   3 +-
 sql/core/pom.xml                                |   2 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  20 +-
 .../apache/spark/sql/execution/commands.scala   |  42 ++-
 .../org/apache/spark/sql/SQLConfSuite.scala     |  13 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  10 +-
 sql/hive-thriftserver/pom.xml                   |  82 +++++
 .../hive/thriftserver/HiveThriftServer2.scala   |  97 ++++++
 .../sql/hive/thriftserver/ReflectionUtils.scala |  58 ++++
 .../hive/thriftserver/SparkSQLCLIDriver.scala   | 344 +++++++++++++++++++
 .../hive/thriftserver/SparkSQLCLIService.scala  |  74 ++++
 .../sql/hive/thriftserver/SparkSQLDriver.scala  |  93 +++++
 .../sql/hive/thriftserver/SparkSQLEnv.scala     |  58 ++++
 .../thriftserver/SparkSQLSessionManager.scala   |  49 +++
 .../server/SparkSQLOperationManager.scala       | 151 ++++++++
 .../src/test/resources/data/files/small_kv.txt  |   5 +
 .../spark/sql/hive/thriftserver/CliSuite.scala  |  57 +++
 .../thriftserver/HiveThriftServer2Suite.scala   | 135 ++++++++
 .../spark/sql/hive/thriftserver/TestUtils.scala | 108 ++++++
 sql/hive/pom.xml                                |   2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  50 ++-
 streaming/pom.xml                               |   2 +-
 tools/pom.xml                                   |   2 +-
 yarn/alpha/pom.xml                              |   2 +-
 yarn/pom.xml                                    |   2 +-
 yarn/stable/pom.xml                             |   2 +-
 54 files changed, 1781 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 061c894..5b56a67 100644
--- a/.gitignore
+++ b/.gitignore
@@ -57,3 +57,4 @@ metastore_db/
 metastore/
 warehouse/
 TempStatsStore/
+sql/hive-thriftserver/test_warehouses

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 567a8dd..703f159 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -166,6 +166,16 @@
       </dependencies>
     </profile>
     <profile>
+      <id>hive-thriftserver</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
       <id>spark-ganglia-lgpl</id>
       <dependencies>
         <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bagel/pom.xml
----------------------------------------------------------------------
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 90c4b09..bd51b11 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-bagel_2.10</artifactId>
   <properties>
-     <sbt.project.name>bagel</sbt.project.name>
+    <sbt.project.name>bagel</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Bagel</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bin/beeline
----------------------------------------------------------------------
diff --git a/bin/beeline b/bin/beeline
new file mode 100755
index 0000000..09fe366
--- /dev/null
+++ b/bin/beeline
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
+
+# Find the java binary
+if [ -n "${JAVA_HOME}" ]; then
+  RUNNER="${JAVA_HOME}/bin/java"
+else
+  if [ `command -v java` ]; then
+    RUNNER="java"
+  else
+    echo "JAVA_HOME is not set" >&2
+    exit 1
+  fi
+fi
+
+# Compute classpath using external script
+classpath_output=$($FWDIR/bin/compute-classpath.sh)
+if [[ "$?" != "0" ]]; then
+  echo "$classpath_output"
+  exit 1
+else
+  CLASSPATH=$classpath_output
+fi
+
+CLASS="org.apache.hive.beeline.BeeLine"
+exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bin/compute-classpath.sh
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index e81e8c0..16b794a 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -52,6 +52,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
   CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
   CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
   CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
+  CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
   CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
 fi
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bin/spark-shell
----------------------------------------------------------------------
diff --git a/bin/spark-shell b/bin/spark-shell
index 850e950..756c817 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -46,11 +46,11 @@ function main(){
         # (see https://github.com/sbt/sbt/issues/562).
         stty -icanon min 1 -echo > /dev/null 2>&1
         export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
-        $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
+        $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
         stty icanon echo > /dev/null 2>&1
     else
         export SPARK_SUBMIT_OPTS
-        $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
+        $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
     fi
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bin/spark-shell.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd
index 4b9708a..b56d698 100755
--- a/bin/spark-shell.cmd
+++ b/bin/spark-shell.cmd
@@ -19,4 +19,4 @@ rem
 
 set SPARK_HOME=%~dp0..
 
-cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
+cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/bin/spark-sql
----------------------------------------------------------------------
diff --git a/bin/spark-sql b/bin/spark-sql
new file mode 100755
index 0000000..bba7f89
--- /dev/null
+++ b/bin/spark-sql
@@ -0,0 +1,36 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Shell script for starting the Spark SQL CLI
+
+# Enter posix mode for bash
+set -o posix
+
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  echo "Usage: ./sbin/spark-sql [options]"
+  $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  exit 0
+fi
+
+CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
+exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 1054cec..a247434 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.10</artifactId>
   <properties>
-     <sbt.project.name>core</sbt.project.name>
+    <sbt.project.name>core</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Core</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 3b5642b..c9cec33 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -46,6 +46,10 @@ object SparkSubmit {
   private val CLUSTER = 2
   private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
 
+  // A special jar name that indicates the class being run is inside of Spark itself, and therefore
+  // no user jar is needed.
+  private val SPARK_INTERNAL = "spark-internal"
+
   // Special primary resource names that represent shells rather than application jars.
   private val SPARK_SHELL = "spark-shell"
   private val PYSPARK_SHELL = "pyspark-shell"
@@ -257,7 +261,9 @@ object SparkSubmit {
     // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
     if (clusterManager == YARN && deployMode == CLUSTER) {
       childMainClass = "org.apache.spark.deploy.yarn.Client"
-      childArgs += ("--jar", args.primaryResource)
+      if (args.primaryResource != SPARK_INTERNAL) {
+        childArgs += ("--jar", args.primaryResource)
+      }
       childArgs += ("--class", args.mainClass)
       if (args.childArgs != null) {
         args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
@@ -332,7 +338,7 @@ object SparkSubmit {
    * Return whether the given primary resource represents a user jar.
    */
   private def isUserJar(primaryResource: String): Boolean = {
-    !isShell(primaryResource) && !isPython(primaryResource)
+    !isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
   }
 
   /**
@@ -349,6 +355,10 @@ object SparkSubmit {
     primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
   }
 
+  private[spark] def isInternal(primaryResource: String): Boolean = {
+    primaryResource == SPARK_INTERNAL
+  }
+
   /**
    * Merge a sequence of comma-separated file lists, some of which may be null to indicate
    * no files, into a single comma-separated string.

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 3ab67a4..01d0ae5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -204,8 +204,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
 
   /** Fill in values by parsing user options. */
   private def parseOpts(opts: Seq[String]): Unit = {
-    // Delineates parsing of Spark options from parsing of user options.
     var inSparkOpts = true
+
+    // Delineates parsing of Spark options from parsing of user options.
     parse(opts)
 
     def parse(opts: Seq[String]): Unit = opts match {
@@ -318,7 +319,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
               SparkSubmit.printErrorAndExit(errMessage)
             case v =>
               primaryResource =
-                if (!SparkSubmit.isShell(v)) {
+                if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
                   Utils.resolveURI(v).toString
                 } else {
                   v

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/dev/create-release/create-release.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh
index 3883010..33de24d 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -53,7 +53,7 @@ if [[ ! "$@" =~ --package-only ]]; then
     -Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
     -Dmaven.javadoc.skip=true \
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-    -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
     -Dtag=$GIT_TAG -DautoVersionSubmodules=true \
     --batch-mode release:prepare
 
@@ -61,7 +61,7 @@ if [[ ! "$@" =~ --package-only ]]; then
     -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
     -Dmaven.javadoc.skip=true \
-    -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
     release:perform
 
   cd ..
@@ -111,10 +111,10 @@ make_binary_release() {
     spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
 }
 
-make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4"
-make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
+make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4"
+make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
 make_binary_release "hadoop2" \
-  "-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
+  "-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
 
 # Copy data
 echo "Copying release tarballs"

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index 51e4def..98ec969 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -65,7 +65,7 @@ echo "========================================================================="
 # (either resolution or compilation) prompts the user for input either q, r, 
 # etc to quit or retry. This echo is there to make it not block.
 if [ -n "$_RUN_SQL_TESTS" ]; then
-  echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive" sbt/sbt clean package \
+  echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive -Phive-thriftserver" sbt/sbt clean package \
     assembly/assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
 else
   echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index a02d069..d9f2b91 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -17,7 +17,7 @@
 # limitations under the License.
 #
 
-echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt
+echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
 # Check style with YARN alpha built too
 echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
   >> scalastyle.txt

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3872853..156e0ae 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -136,7 +136,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.createSchemaRDD
 
 // Define the schema using a case class.
-// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, 
+// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
 // you can use custom classes that implement the Product interface.
 case class Person(name: String, age: Int)
 
@@ -548,7 +548,6 @@ results = hiveContext.hql("FROM src SELECT key, value").collect()
 </div>
 </div>
 
-
 # Writing Language-Integrated Relational Queries
 
 **Language-Integrated queries are currently only supported in Scala.**
@@ -573,4 +572,200 @@ prefixed with a tick (`'`).  Implicit conversions turn these symbols into expres
 evaluated by the SQL execution engine.  A full list of the functions supported can be found in the
 [ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
 
-<!-- TODO: Include the table of operations here. -->
\ No newline at end of file
+<!-- TODO: Include the table of operations here. -->
+
+## Running the Thrift JDBC server
+
+The Thrift JDBC server implemented here corresponds to the [`HiveServer2`]
+(https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test
+the JDBC server with the beeline script comes with either Spark or Hive 0.12.  In order to use Hive
+you must first run '`sbt/sbt -Phive-thriftserver assembly/assembly`' (or use `-Phive-thriftserver`
+for maven).
+
+To start the JDBC server, run the following in the Spark directory:
+
+    ./sbin/start-thriftserver.sh
+
+The default port the server listens on is 10000.  To listen on customized host and port, please set
+the `HIVE_SERVER2_THRIFT_PORT` and `HIVE_SERVER2_THRIFT_BIND_HOST` environment variables. You may
+run `./sbin/start-thriftserver.sh --help` for a complete list of all available options.  Now you can
+use beeline to test the Thrift JDBC server:
+
+    ./bin/beeline
+
+Connect to the JDBC server in beeline with:
+
+    beeline> !connect jdbc:hive2://localhost:10000
+
+Beeline will ask you for a username and password. In non-secure mode, simply enter the username on
+your machine and a blank password. For secure mode, please follow the instructions given in the
+[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients)
+
+Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
+
+You may also use the beeline script comes with Hive.
+
+### Migration Guide for Shark Users
+
+#### Reducer number
+
+In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark
+SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value
+is 200. Users may customize this property via `SET`:
+
+```
+SET spark.sql.shuffle.partitions=10;
+SELECT page, count(*) c FROM logs_last_month_cached
+GROUP BY page ORDER BY c DESC LIMIT 10;
+```
+
+You may also put this property in `hive-site.xml` to override the default value.
+
+For now, the `mapred.reduce.tasks` property is still recognized, and is converted to
+`spark.sql.shuffle.partitions` automatically.
+
+#### Caching
+
+The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no
+longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to
+let user control table caching explicitly:
+
+```
+CACHE TABLE logs_last_month;
+UNCACHE TABLE logs_last_month;
+```
+
+**NOTE** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary",
+but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be
+cached, you may simply count the table immediately after executing `CACHE TABLE`:
+
+```
+CACHE TABLE logs_last_month;
+SELECT COUNT(1) FROM logs_last_month;
+```
+
+Several caching related features are not supported yet:
+
+* User defined partition level cache eviction policy
+* RDD reloading
+* In-memory cache write through policy
+
+### Compatibility with Apache Hive
+
+#### Deploying in Exising Hive Warehouses
+
+Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive
+installations. You do not need to modify your existing Hive Metastore or change the data placement
+or partitioning of your tables.
+
+#### Supported Hive Features
+
+Spark SQL supports the vast majority of Hive features, such as:
+
+* Hive query statements, including:
+ * `SELECT`
+ * `GROUP BY
+ * `ORDER BY`
+ * `CLUSTER BY`
+ * `SORT BY`
+* All Hive operators, including:
+ * Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc)
+ * Arthimatic operators (`+`, `-`, `*`, `/`, `%`, etc)
+ * Logical operators (`AND`, `&&`, `OR`, `||`, etc)
+ * Complex type constructors
+ * Mathemtatical functions (`sign`, `ln`, `cos`, etc)
+ * String functions (`instr`, `length`, `printf`, etc)
+* User defined functions (UDF)
+* User defined aggregation functions (UDAF)
+* User defined serialization formats (SerDe's)
+* Joins
+ * `JOIN`
+ * `{LEFT|RIGHT|FULL} OUTER JOIN`
+ * `LEFT SEMI JOIN`
+ * `CROSS JOIN`
+* Unions
+* Sub queries
+ * `SELECT col FROM ( SELECT a + b AS col from t1) t2`
+* Sampling
+* Explain
+* Partitioned tables
+* All Hive DDL Functions, including:
+ * `CREATE TABLE`
+ * `CREATE TABLE AS SELECT`
+ * `ALTER TABLE`
+* Most Hive Data types, including:
+ * `TINYINT`
+ * `SMALLINT`
+ * `INT`
+ * `BIGINT`
+ * `BOOLEAN`
+ * `FLOAT`
+ * `DOUBLE`
+ * `STRING`
+ * `BINARY`
+ * `TIMESTAMP`
+ * `ARRAY<>`
+ * `MAP<>`
+ * `STRUCT<>`
+
+#### Unsupported Hive Functionality
+
+Below is a list of Hive features that we don't support yet. Most of these features are rarely used
+in Hive deployments.
+
+**Major Hive Features**
+
+* Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL
+  doesn't support buckets yet.
+
+**Esoteric Hive Features**
+
+* Tables with partitions using different input formats: In Spark SQL, all table partitions need to
+  have the same input format.
+* Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions
+  (e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple.
+* `UNIONTYPE`
+* Unique join
+* Single query multi insert
+* Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at
+  the moment.
+
+**Hive Input/Output Formats**
+
+* File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
+* Hadoop archive
+
+**Hive Optimizations**
+
+A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are
+not necessary due to Spark SQL's in-memory computational model. Others are slotted for future
+releases of Spark SQL.
+
+* Block level bitmap indexes and virtual columns (used to build indexes)
+* Automatically convert a join to map join: For joining a large table with multiple small tables,
+  Hive automatically converts the join into a map join. We are adding this auto conversion in the
+  next release.
+* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you
+  need to control the degree of parallelism post-shuffle using "SET
+  spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the
+  next release.
+* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still
+  launches tasks to compute the result.
+* Skew data flag: Spark SQL does not follow the skew data flags in Hive.
+* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint.
+* Merge multiple small files for query results: if the result output contains multiple small files,
+  Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS
+  metadata. Spark SQL does not support that.
+
+## Running the Spark SQL CLI
+
+The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute
+queries input from command line. Note: the Spark SQL CLI cannot talk to the Thrift JDBC server.
+
+To start the Spark SQL CLI, run the following in the Spark directory:
+
+    ./bin/spark-sql
+
+Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
+You may run `./bin/spark-sql --help` for a complete list of all available
+options.

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index bd1c387..c4ed0f5 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-examples_2.10</artifactId>
   <properties>
-     <sbt.project.name>examples</sbt.project.name>
+    <sbt.project.name>examples</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project Examples</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 61a6aff..874b8a7 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-flume_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming-flume</sbt.project.name>
+    <sbt.project.name>streaming-flume</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project External Flume</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/external/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index 4762c50..25a5c0a 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-kafka_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming-kafka</sbt.project.name>
+    <sbt.project.name>streaming-kafka</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project External Kafka</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 32c530e..f31ed65 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-mqtt_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming-mqtt</sbt.project.name>
+    <sbt.project.name>streaming-mqtt</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project External MQTT</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/external/twitter/pom.xml
----------------------------------------------------------------------
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 637adb0..56bb24c 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-twitter_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming-twitter</sbt.project.name>
+    <sbt.project.name>streaming-twitter</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project External Twitter</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/external/zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index e4d758a..54b0242 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-streaming-zeromq_2.10</artifactId>
   <properties>
-     <sbt.project.name>streaming-zeromq</sbt.project.name>
+    <sbt.project.name>streaming-zeromq</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project External ZeroMQ</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/graphx/pom.xml
----------------------------------------------------------------------
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 7e3bcf2..6dd52fc 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-graphx_2.10</artifactId>
   <properties>
-     <sbt.project.name>graphx</sbt.project.name>
+    <sbt.project.name>graphx</sbt.project.name>
   </properties>
   <packaging>jar</packaging>
   <name>Spark Project GraphX</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/mllib/pom.xml
----------------------------------------------------------------------
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 92b07e2..f27cf52 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -28,7 +28,7 @@
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-mllib_2.10</artifactId>
   <properties>
-     <sbt.project.name>mllib</sbt.project.name>
+    <sbt.project.name>mllib</sbt.project.name>
   </properties>  
   <packaging>jar</packaging>
   <name>Spark Project ML Library</name>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4e2d64a..3e9d388 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
     <module>sql/catalyst</module>
     <module>sql/core</module>
     <module>sql/hive</module>
+    <module>sql/hive-thriftserver</module>
     <module>repl</module>
     <module>assembly</module>
     <module>external/twitter</module>
@@ -252,9 +253,9 @@
         <version>3.3.2</version>
       </dependency>
       <dependency>
-	  <groupId>commons-codec</groupId>
-	    <artifactId>commons-codec</artifactId>
-	    <version>1.5</version>
+        <groupId>commons-codec</groupId>
+        <artifactId>commons-codec</artifactId>
+        <version>1.5</version>
       </dependency>
       <dependency>
         <groupId>com.google.code.findbugs</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 62576f8..1629bc2 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -30,11 +30,11 @@ object BuildCommons {
 
   private val buildLocation = file(".").getAbsoluteFile.getParentFile
 
-  val allProjects@Seq(bagel, catalyst, core, graphx, hive, mllib, repl, spark, sql, streaming,
-  streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
-    Seq("bagel", "catalyst", "core", "graphx", "hive", "mllib", "repl", "spark", "sql",
-      "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
-      "streaming-zeromq").map(ProjectRef(buildLocation, _))
+  val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, sql,
+  streaming, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
+    Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
+      "spark", "sql", "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt",
+      "streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
 
   val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) =
     Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl")
@@ -100,7 +100,7 @@ object SparkBuild extends PomBuild {
   Properties.envOrNone("SBT_MAVEN_PROPERTIES") match {
     case Some(v) =>
       v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.split("=")).foreach(x => System.setProperty(x(0), x(1)))
-    case _ => 
+    case _ =>
   }
 
   override val userPropertiesMap = System.getProperties.toMap
@@ -158,7 +158,7 @@ object SparkBuild extends PomBuild {
 
   /* Enable Mima for all projects except spark, hive, catalyst, sql  and repl */
   // TODO: Add Sql to mima checks
-  allProjects.filterNot(y => Seq(spark, sql, hive, catalyst, repl).exists(x => x == y)).
+  allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl).contains(x)).
     foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x))
 
   /* Enable Assembly for all assembly projects */

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sbin/start-thriftserver.sh
----------------------------------------------------------------------
diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh
new file mode 100755
index 0000000..8398e6f
--- /dev/null
+++ b/sbin/start-thriftserver.sh
@@ -0,0 +1,36 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Shell script for starting the Spark SQL Thrift server
+
+# Enter posix mode for bash
+set -o posix
+
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  echo "Usage: ./sbin/start-thriftserver [options]"
+  $FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  exit 0
+fi
+
+CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
+exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 6decde3..531bfdd 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -32,7 +32,7 @@
   <name>Spark Project Catalyst</name>
   <url>http://spark.apache.org/</url>
   <properties>
-     <sbt.project.name>catalyst</sbt.project.name>
+    <sbt.project.name>catalyst</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 1d5f033..a357c6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -43,8 +43,7 @@ case class NativeCommand(cmd: String) extends Command {
  */
 case class SetCommand(key: Option[String], value: Option[String]) extends Command {
   override def output = Seq(
-    BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
-    BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
+    BoundReference(1, AttributeReference("", StringType, nullable = false)()))
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index c309c43..3a038a2 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -32,7 +32,7 @@
   <name>Spark Project SQL</name>
   <url>http://spark.apache.org/</url>
   <properties>
-     <sbt.project.name>sql</sbt.project.name>
+    <sbt.project.name>sql</sbt.project.name>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2b787e1..41920c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -30,12 +30,13 @@ import scala.collection.JavaConverters._
  * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
  */
 trait SQLConf {
+  import SQLConf._
 
   /** ************************ Spark SQL Params/Hints ******************* */
   // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
 
   /** Number of partitions to use for shuffle operators. */
-  private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
+  private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -43,11 +44,10 @@ trait SQLConf {
    * effectively disables auto conversion.
    * Hive setting: hive.auto.convert.join.noconditionaltask.size.
    */
-  private[spark] def autoConvertJoinSize: Int =
-    get("spark.sql.auto.convert.join.size", "10000").toInt
+  private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
 
   /** A comma-separated list of table names marked to be broadcasted during joins. */
-  private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
+  private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
 
   /** ********************** SQLConf functionality methods ************ */
 
@@ -61,7 +61,7 @@ trait SQLConf {
 
   def set(key: String, value: String): Unit = {
     require(key != null, "key cannot be null")
-    require(value != null, s"value cannot be null for ${key}")
+    require(value != null, s"value cannot be null for $key")
     settings.put(key, value)
   }
 
@@ -90,3 +90,13 @@ trait SQLConf {
   }
 
 }
+
+object SQLConf {
+  val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
+  val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
+  val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
+
+  object Deprecated {
+    val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 98d2f89..9293239 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 
 trait Command {
   /**
@@ -44,28 +45,53 @@ trait Command {
 case class SetCommand(
     key: Option[String], value: Option[String], output: Seq[Attribute])(
     @transient context: SQLContext)
-  extends LeafNode with Command {
+  extends LeafNode with Command with Logging {
 
-  override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
+  override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
     // Set value for key k.
     case (Some(k), Some(v)) =>
-      context.set(k, v)
-      Array(k -> v)
+      if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
+        logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+          s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
+        context.set(SQLConf.SHUFFLE_PARTITIONS, v)
+        Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+      } else {
+        context.set(k, v)
+        Array(s"$k=$v")
+      }
 
     // Query the value bound to key k.
     case (Some(k), _) =>
-      Array(k -> context.getOption(k).getOrElse("<undefined>"))
+      // TODO (lian) This is just a workaround to make the Simba ODBC driver work.
+      // Should remove this once we get the ODBC driver updated.
+      if (k == "-v") {
+        val hiveJars = Seq(
+          "hive-exec-0.12.0.jar",
+          "hive-service-0.12.0.jar",
+          "hive-common-0.12.0.jar",
+          "hive-hwi-0.12.0.jar",
+          "hive-0.12.0.jar").mkString(":")
+
+        Array(
+          "system:java.class.path=" + hiveJars,
+          "system:sun.java.command=shark.SharkServer2")
+      }
+      else {
+        Array(s"$k=${context.getOption(k).getOrElse("<undefined>")}")
+      }
 
     // Query all key-value pairs that are set in the SQLConf of the context.
     case (None, None) =>
-      context.getAll
+      context.getAll.map { case (k, v) =>
+        s"$k=$v"
+      }
 
     case _ =>
       throw new IllegalArgumentException()
   }
 
   def execute(): RDD[Row] = {
-    val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
+    val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
     context.sparkContext.parallelize(rows, 1)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 08293f7..1a58d73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -54,10 +54,10 @@ class SQLConfSuite extends QueryTest {
     assert(get(testKey, testVal + "_") == testVal)
     assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
 
-    sql("set mapred.reduce.tasks=20")
-    assert(get("mapred.reduce.tasks", "0") == "20")
-    sql("set mapred.reduce.tasks = 40")
-    assert(get("mapred.reduce.tasks", "0") == "40")
+    sql("set some.property=20")
+    assert(get("some.property", "0") == "20")
+    sql("set some.property = 40")
+    assert(get("some.property", "0") == "40")
 
     val key = "spark.sql.key"
     val vs = "val0,val_1,val2.3,my_table"
@@ -70,4 +70,9 @@ class SQLConfSuite extends QueryTest {
     clear()
   }
 
+  test("deprecated property") {
+    clear()
+    sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
+    assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6736189..de9e8aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -424,25 +424,25 @@ class SQLQuerySuite extends QueryTest {
     sql(s"SET $testKey=$testVal")
     checkAnswer(
       sql("SET"),
-      Seq(Seq(testKey, testVal))
+      Seq(Seq(s"$testKey=$testVal"))
     )
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     checkAnswer(
       sql("set"),
       Seq(
-        Seq(testKey, testVal),
-        Seq(testKey + testKey, testVal + testVal))
+        Seq(s"$testKey=$testVal"),
+        Seq(s"${testKey + testKey}=${testVal + testVal}"))
     )
 
     // "set key"
     checkAnswer(
       sql(s"SET $testKey"),
-      Seq(Seq(testKey, testVal))
+      Seq(Seq(s"$testKey=$testVal"))
     )
     checkAnswer(
       sql(s"SET $nonexistentKey"),
-      Seq(Seq(nonexistentKey, "<undefined>"))
+      Seq(Seq(s"$nonexistentKey=<undefined>"))
     )
     clear()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
new file mode 100644
index 0000000..7fac90f
--- /dev/null
+++ b/sql/hive-thriftserver/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-hive-thriftserver_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project Hive</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>hive-thriftserver</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.spark-project.hive</groupId>
+      <artifactId>hive-cli</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.spark-project.hive</groupId>
+      <artifactId>hive-jdbc</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.spark-project.hive</groupId>
+      <artifactId>hive-beeline</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
new file mode 100644
index 0000000..ddbc2a7
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
+import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
+
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+/**
+ * The main entry point for the Spark SQL port of HiveServer2.  Starts up a `SparkSQLContext` and a
+ * `HiveThriftServer2` thrift server.
+ */
+private[hive] object HiveThriftServer2 extends Logging {
+  var LOG = LogFactory.getLog(classOf[HiveServer2])
+
+  def main(args: Array[String]) {
+    val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
+
+    if (!optionsProcessor.process(args)) {
+      logger.warn("Error starting HiveThriftServer2 with given arguments")
+      System.exit(-1)
+    }
+
+    val ss = new SessionState(new HiveConf(classOf[SessionState]))
+
+    // Set all properties specified via command line.
+    val hiveConf: HiveConf = ss.getConf
+    hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
+      logger.debug(s"HiveConf var: $k=$v")
+    }
+
+    SessionState.start(ss)
+
+    logger.info("Starting SparkContext")
+    SparkSQLEnv.init()
+    SessionState.start(ss)
+
+    Runtime.getRuntime.addShutdownHook(
+      new Thread() {
+        override def run() {
+          SparkSQLEnv.sparkContext.stop()
+        }
+      }
+    )
+
+    try {
+      val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
+      server.init(hiveConf)
+      server.start()
+      logger.info("HiveThriftServer2 started")
+    } catch {
+      case e: Exception =>
+        logger.error("Error starting HiveThriftServer2", e)
+        System.exit(-1)
+    }
+  }
+}
+
+private[hive] class HiveThriftServer2(hiveContext: HiveContext)
+  extends HiveServer2
+  with ReflectedCompositeService {
+
+  override def init(hiveConf: HiveConf) {
+    val sparkSqlCliService = new SparkSQLCLIService(hiveContext)
+    setSuperField(this, "cliService", sparkSqlCliService)
+    addService(sparkSqlCliService)
+
+    val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
+    setSuperField(this, "thriftCLIService", thriftCliService)
+    addService(thriftCliService)
+
+    initCompositeService(hiveConf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala
new file mode 100644
index 0000000..599294d
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ReflectionUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+private[hive] object ReflectionUtils {
+  def setSuperField(obj : Object, fieldName: String, fieldValue: Object) {
+    setAncestorField(obj, 1, fieldName, fieldValue)
+  }
+
+  def setAncestorField(obj: AnyRef, level: Int, fieldName: String, fieldValue: AnyRef) {
+    val ancestor = Iterator.iterate[Class[_]](obj.getClass)(_.getSuperclass).drop(level).next()
+    val field = ancestor.getDeclaredField(fieldName)
+    field.setAccessible(true)
+    field.set(obj, fieldValue)
+  }
+
+  def getSuperField[T](obj: AnyRef, fieldName: String): T = {
+    getAncestorField[T](obj, 1, fieldName)
+  }
+
+  def getAncestorField[T](clazz: Object, level: Int, fieldName: String): T = {
+    val ancestor = Iterator.iterate[Class[_]](clazz.getClass)(_.getSuperclass).drop(level).next()
+    val field = ancestor.getDeclaredField(fieldName)
+    field.setAccessible(true)
+    field.get(clazz).asInstanceOf[T]
+  }
+
+  def invokeStatic(clazz: Class[_], methodName: String, args: (Class[_], AnyRef)*): AnyRef = {
+    invoke(clazz, null, methodName, args: _*)
+  }
+
+  def invoke(
+      clazz: Class[_],
+      obj: AnyRef,
+      methodName: String,
+      args: (Class[_], AnyRef)*): AnyRef = {
+
+    val (types, values) = args.unzip
+    val method = clazz.getDeclaredMethod(methodName, types: _*)
+    method.setAccessible(true)
+    method.invoke(obj, values.toSeq: _*)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
new file mode 100755
index 0000000..27268ec
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+
+import java.io._
+import java.util.{ArrayList => JArrayList}
+
+import jline.{ConsoleReader, History}
+import org.apache.commons.lang.StringUtils
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
+import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
+import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.thrift.transport.TSocket
+
+import org.apache.spark.sql.Logging
+
+private[hive] object SparkSQLCLIDriver {
+  private var prompt = "spark-sql"
+  private var continuedPrompt = "".padTo(prompt.length, ' ')
+  private var transport:TSocket = _
+
+  installSignalHandler()
+
+  /**
+   * Install an interrupt callback to cancel all Spark jobs. In Hive's CliDriver#processLine(),
+   * a signal handler will invoke this registered callback if a Ctrl+C signal is detected while
+   * a command is being processed by the current thread.
+   */
+  def installSignalHandler() {
+    HiveInterruptUtils.add(new HiveInterruptCallback {
+      override def interrupt() {
+        // Handle remote execution mode
+        if (SparkSQLEnv.sparkContext != null) {
+          SparkSQLEnv.sparkContext.cancelAllJobs()
+        } else {
+          if (transport != null) {
+            // Force closing of TCP connection upon session termination
+            transport.getSocket.close()
+          }
+        }
+      }
+    })
+  }
+
+  def main(args: Array[String]) {
+    val oproc = new OptionsProcessor()
+    if (!oproc.process_stage1(args)) {
+      System.exit(1)
+    }
+
+    // NOTE: It is critical to do this here so that log4j is reinitialized
+    // before any of the other core hive classes are loaded
+    var logInitFailed = false
+    var logInitDetailMessage: String = null
+    try {
+      logInitDetailMessage = LogUtils.initHiveLog4j()
+    } catch {
+      case e: LogInitializationException =>
+        logInitFailed = true
+        logInitDetailMessage = e.getMessage
+    }
+
+    val sessionState = new CliSessionState(new HiveConf(classOf[SessionState]))
+
+    sessionState.in = System.in
+    try {
+      sessionState.out = new PrintStream(System.out, true, "UTF-8")
+      sessionState.info = new PrintStream(System.err, true, "UTF-8")
+      sessionState.err = new PrintStream(System.err, true, "UTF-8")
+    } catch {
+      case e: UnsupportedEncodingException => System.exit(3)
+    }
+
+    if (!oproc.process_stage2(sessionState)) {
+      System.exit(2)
+    }
+
+    if (!sessionState.getIsSilent) {
+      if (logInitFailed) System.err.println(logInitDetailMessage)
+      else SessionState.getConsole.printInfo(logInitDetailMessage)
+    }
+
+    // Set all properties specified via command line.
+    val conf: HiveConf = sessionState.getConf
+    sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
+      conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
+      sessionState.getOverriddenConfigurations.put(
+        item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
+    }
+
+    SessionState.start(sessionState)
+
+    // Clean up after we exit
+    Runtime.getRuntime.addShutdownHook(
+      new Thread() {
+        override def run() {
+          SparkSQLEnv.stop()
+        }
+      }
+    )
+
+    // "-h" option has been passed, so connect to Hive thrift server.
+    if (sessionState.getHost != null) {
+      sessionState.connect()
+      if (sessionState.isRemoteMode) {
+        prompt = s"[${sessionState.getHost}:${sessionState.getPort}]" + prompt
+        continuedPrompt = "".padTo(prompt.length, ' ')
+      }
+    }
+
+    if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
+      // Hadoop-20 and above - we need to augment classpath using hiveconf
+      // components.
+      // See also: code in ExecDriver.java
+      var loader = conf.getClassLoader
+      val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
+      if (StringUtils.isNotBlank(auxJars)) {
+        loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","))
+      }
+      conf.setClassLoader(loader)
+      Thread.currentThread().setContextClassLoader(loader)
+    }
+
+    val cli = new SparkSQLCLIDriver
+    cli.setHiveVariables(oproc.getHiveVariables)
+
+    // TODO work around for set the log output to console, because the HiveContext
+    // will set the output into an invalid buffer.
+    sessionState.in = System.in
+    try {
+      sessionState.out = new PrintStream(System.out, true, "UTF-8")
+      sessionState.info = new PrintStream(System.err, true, "UTF-8")
+      sessionState.err = new PrintStream(System.err, true, "UTF-8")
+    } catch {
+      case e: UnsupportedEncodingException => System.exit(3)
+    }
+
+    // Execute -i init files (always in silent mode)
+    cli.processInitFiles(sessionState)
+
+    if (sessionState.execString != null) {
+      System.exit(cli.processLine(sessionState.execString))
+    }
+
+    try {
+      if (sessionState.fileName != null) {
+        System.exit(cli.processFile(sessionState.fileName))
+      }
+    } catch {
+      case e: FileNotFoundException =>
+        System.err.println(s"Could not open input file for reading. (${e.getMessage})")
+        System.exit(3)
+    }
+
+    val reader = new ConsoleReader()
+    reader.setBellEnabled(false)
+    // reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
+    CliDriver.getCommandCompletor.foreach((e) => reader.addCompletor(e))
+
+    val historyDirectory = System.getProperty("user.home")
+
+    try {
+      if (new File(historyDirectory).exists()) {
+        val historyFile = historyDirectory + File.separator + ".hivehistory"
+        reader.setHistory(new History(new File(historyFile)))
+      } else {
+        System.err.println("WARNING: Directory for Hive history file: " + historyDirectory +
+                           " does not exist.   History will not be available during this session.")
+      }
+    } catch {
+      case e: Exception =>
+        System.err.println("WARNING: Encountered an error while trying to initialize Hive's " +
+                           "history file.  History will not be available during this session.")
+        System.err.println(e.getMessage)
+    }
+
+    val clientTransportTSocketField = classOf[CliSessionState].getDeclaredField("transport")
+    clientTransportTSocketField.setAccessible(true)
+
+    transport = clientTransportTSocketField.get(sessionState).asInstanceOf[TSocket]
+
+    var ret = 0
+    var prefix = ""
+    val currentDB = ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb",
+      classOf[HiveConf] -> conf, classOf[CliSessionState] -> sessionState)
+
+    def promptWithCurrentDB = s"$prompt$currentDB"
+    def continuedPromptWithDBSpaces = continuedPrompt + ReflectionUtils.invokeStatic(
+      classOf[CliDriver], "spacesForString", classOf[String] -> currentDB)
+
+    var currentPrompt = promptWithCurrentDB
+    var line = reader.readLine(currentPrompt + "> ")
+
+    while (line != null) {
+      if (prefix.nonEmpty) {
+        prefix += '\n'
+      }
+
+      if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
+        line = prefix + line
+        ret = cli.processLine(line, true)
+        prefix = ""
+        currentPrompt = promptWithCurrentDB
+      } else {
+        prefix = prefix + line
+        currentPrompt = continuedPromptWithDBSpaces
+      }
+
+      line = reader.readLine(currentPrompt + "> ")
+    }
+
+    sessionState.close()
+
+    System.exit(ret)
+  }
+}
+
+private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
+  private val sessionState = SessionState.get().asInstanceOf[CliSessionState]
+
+  private val LOG = LogFactory.getLog("CliDriver")
+
+  private val console = new SessionState.LogHelper(LOG)
+
+  private val conf: Configuration =
+    if (sessionState != null) sessionState.getConf else new Configuration()
+
+  // Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver
+  // because the Hive unit tests do not go through the main() code path.
+  if (!sessionState.isRemoteMode) {
+    SparkSQLEnv.init()
+  }
+
+  override def processCmd(cmd: String): Int = {
+    val cmd_trimmed: String = cmd.trim()
+    val tokens: Array[String] = cmd_trimmed.split("\\s+")
+    val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
+    if (cmd_trimmed.toLowerCase.equals("quit") ||
+      cmd_trimmed.toLowerCase.equals("exit") ||
+      tokens(0).equalsIgnoreCase("source") ||
+      cmd_trimmed.startsWith("!") ||
+      tokens(0).toLowerCase.equals("list") ||
+      sessionState.isRemoteMode) {
+      val start = System.currentTimeMillis()
+      super.processCmd(cmd)
+      val end = System.currentTimeMillis()
+      val timeTaken: Double = (end - start) / 1000.0
+      console.printInfo(s"Time taken: $timeTaken seconds")
+      0
+    } else {
+      var ret = 0
+      val hconf = conf.asInstanceOf[HiveConf]
+      val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
+
+      if (proc != null) {
+        if (proc.isInstanceOf[Driver]) {
+          val driver = new SparkSQLDriver
+
+          driver.init()
+          val out = sessionState.out
+          val start:Long = System.currentTimeMillis()
+          if (sessionState.getIsVerbose) {
+            out.println(cmd)
+          }
+
+          ret = driver.run(cmd).getResponseCode
+          if (ret != 0) {
+            driver.close()
+            return ret
+          }
+
+          val res = new JArrayList[String]()
+
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) {
+            // Print the column names.
+            Option(driver.getSchema.getFieldSchemas).map { fields =>
+              out.println(fields.map(_.getName).mkString("\t"))
+            }
+          }
+
+          try {
+            while (!out.checkError() && driver.getResults(res)) {
+              res.foreach(out.println)
+              res.clear()
+            }
+          } catch {
+            case e:IOException =>
+              console.printError(
+                s"""Failed with exception ${e.getClass.getName}: ${e.getMessage}
+                   |${org.apache.hadoop.util.StringUtils.stringifyException(e)}
+                 """.stripMargin)
+              ret = 1
+          }
+
+          val cret = driver.close()
+          if (ret == 0) {
+            ret = cret
+          }
+
+          val end = System.currentTimeMillis()
+          if (end > start) {
+            val timeTaken:Double = (end - start) / 1000.0
+            console.printInfo(s"Time taken: $timeTaken seconds", null)
+          }
+
+          // Destroy the driver to release all the locks.
+          driver.destroy()
+        } else {
+          if (sessionState.getIsVerbose) {
+            sessionState.out.println(tokens(0) + " " + cmd_1)
+          }
+          ret = proc.run(cmd_1).getResponseCode
+        }
+      }
+      ret
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
new file mode 100644
index 0000000..42cbf36
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+
+import java.io.IOException
+import java.util.{List => JList}
+import javax.security.auth.login.LoginException
+
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hive.service.Service.STATE
+import org.apache.hive.service.auth.HiveAuthFactory
+import org.apache.hive.service.cli.CLIService
+import org.apache.hive.service.{AbstractService, Service, ServiceException}
+
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
+  extends CLIService
+  with ReflectedCompositeService {
+
+  override def init(hiveConf: HiveConf) {
+    setSuperField(this, "hiveConf", hiveConf)
+
+    val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
+    setSuperField(this, "sessionManager", sparkSqlSessionManager)
+    addService(sparkSqlSessionManager)
+
+    try {
+      HiveAuthFactory.loginFromKeytab(hiveConf)
+      val serverUserName = ShimLoader.getHadoopShims
+        .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
+      setSuperField(this, "serverUserName", serverUserName)
+    } catch {
+      case e @ (_: IOException | _: LoginException) =>
+        throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+    }
+
+    initCompositeService(hiveConf)
+  }
+}
+
+private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
+  def initCompositeService(hiveConf: HiveConf) {
+    // Emulating `CompositeService.init(hiveConf)`
+    val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
+    serviceList.foreach(_.init(hiveConf))
+
+    // Emulating `AbstractService.init(hiveConf)`
+    invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
+    setAncestorField(this, 3, "hiveConf", hiveConf)
+    invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
+    getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
new file mode 100644
index 0000000..5202aa9
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import scala.collection.JavaConversions._
+
+import java.util.{ArrayList => JArrayList}
+
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
+import org.apache.hadoop.hive.ql.Driver
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
+
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+
+private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
+  extends Driver with Logging {
+
+  private var tableSchema: Schema = _
+  private var hiveResponse: Seq[String] = _
+
+  override def init(): Unit = {
+  }
+
+  private def getResultSetSchema(query: context.QueryExecution): Schema = {
+    val analyzed = query.analyzed
+    logger.debug(s"Result Schema: ${analyzed.output}")
+    if (analyzed.output.size == 0) {
+      new Schema(new FieldSchema("Response code", "string", "") :: Nil, null)
+    } else {
+      val fieldSchemas = analyzed.output.map { attr =>
+        new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+      }
+
+      new Schema(fieldSchemas, null)
+    }
+  }
+
+  override def run(command: String): CommandProcessorResponse = {
+    val execution = context.executePlan(context.hql(command).logicalPlan)
+
+    // TODO unify the error code
+    try {
+      hiveResponse = execution.stringResult()
+      tableSchema = getResultSetSchema(execution)
+      new CommandProcessorResponse(0)
+    } catch {
+      case cause: Throwable =>
+        logger.error(s"Failed in [$command]", cause)
+        new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(cause), null)
+    }
+  }
+
+  override def close(): Int = {
+    hiveResponse = null
+    tableSchema = null
+    0
+  }
+
+  override def getSchema: Schema = tableSchema
+
+  override def getResults(res: JArrayList[String]): Boolean = {
+    if (hiveResponse == null) {
+      false
+    } else {
+      res.addAll(hiveResponse)
+      hiveResponse = null
+      true
+    }
+  }
+
+  override def destroy() {
+    super.destroy()
+    hiveResponse = null
+    tableSchema = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
new file mode 100644
index 0000000..451c3bd
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import org.apache.hadoop.hive.ql.session.SessionState
+
+import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
+import org.apache.spark.sql.Logging
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+/** A singleton object for the master program. The slaves should not access this. */
+private[hive] object SparkSQLEnv extends Logging {
+  logger.debug("Initializing SparkSQLEnv")
+
+  var hiveContext: HiveContext = _
+  var sparkContext: SparkContext = _
+
+  def init() {
+    if (hiveContext == null) {
+      sparkContext = new SparkContext(new SparkConf()
+        .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
+
+      sparkContext.addSparkListener(new StatsReportListener())
+
+      hiveContext = new HiveContext(sparkContext) {
+        @transient override lazy val sessionState = SessionState.get()
+        @transient override lazy val hiveconf = sessionState.getConf
+      }
+    }
+  }
+
+  /** Cleans up and shuts down the Spark SQL environments. */
+  def stop() {
+    logger.debug("Shutting down Spark SQL Environment")
+    // Stop the SparkContext
+    if (SparkSQLEnv.sparkContext != null) {
+      sparkContext.stop()
+      sparkContext = null
+      hiveContext = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f6ff2a61/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
new file mode 100644
index 0000000..6b3275b
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.util.concurrent.Executors
+
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.session.SessionManager
+
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+
+private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
+  extends SessionManager
+  with ReflectedCompositeService {
+
+  override def init(hiveConf: HiveConf) {
+    setSuperField(this, "hiveConf", hiveConf)
+
+    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
+    getAncestorField[Log](this, 3, "LOG").info(
+      s"HiveServer2: Async execution pool size $backgroundPoolSize")
+
+    val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+    setSuperField(this, "operationManager", sparkSqlOperationManager)
+    addService(sparkSqlOperationManager)
+
+    initCompositeService(hiveConf)
+  }
+}


Mime
View raw message