spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marmb...@apache.org
Subject git commit: [SPARK-1368][SQL] Optimized HiveTableScan
Date Thu, 29 May 2014 22:26:18 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 8bb93909f -> 0f56aadc8


[SPARK-1368][SQL] Optimized HiveTableScan

JIRA issue: [SPARK-1368](https://issues.apache.org/jira/browse/SPARK-1368)

This PR introduces two major updates:

- Replaced FP style code with `while` loop and reusable `GenericMutableRow` object in critical
path of `HiveTableScan`.
- Using `ColumnProjectionUtils` to help optimizing RCFile and ORC column pruning.

My quick micro benchmark suggests these two optimizations made the optimized version 2x and
2.5x faster when scanning CSV table and RCFile table respectively:

```
Original:

[info] CSV: 27676 ms, RCFile: 26415 ms
[info] CSV: 27703 ms, RCFile: 26029 ms
[info] CSV: 27511 ms, RCFile: 25962 ms

Optimized:

[info] CSV: 13820 ms, RCFile: 10402 ms
[info] CSV: 14158 ms, RCFile: 10691 ms
[info] CSV: 13606 ms, RCFile: 10346 ms
```

The micro benchmark loads a 609MB CVS file (structurally similar to the `src` test table)
into a normal Hive table with `LazySimpleSerDe` and a RCFile table, then scans these tables
respectively.

Preparation code:

```scala
package org.apache.spark.examples.sql.hive

import org.apache.spark.sql.hive.LocalHiveContext
import org.apache.spark.{SparkConf, SparkContext}

object HiveTableScanPrepare extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("drop table scan_csv")
  hql("drop table scan_rcfile")

  hql("""create table scan_csv (key int, value string)
        |  row format serde 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
        |  with serdeproperties ('field.delim'=',')
      """.stripMargin)

  hql(s"""load data local inpath "${args(0)}" into table scan_csv""")

  hql("""create table scan_rcfile (key int, value string)
        |  row format serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
        |stored as
        |  inputformat 'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
        |  outputformat 'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
      """.stripMargin)

  hql(
    """
      |from scan_csv
      |insert overwrite table scan_rcfile
      |select scan_csv.key, scan_csv.value
    """.stripMargin)
}
```

Benchmark code:

```scala
package org.apache.spark.examples.sql.hive

import org.apache.spark.sql.hive.LocalHiveContext
import org.apache.spark.{SparkConf, SparkContext}

object HiveTableScanBenchmark extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  val scanCsv = hql("select key from scan_csv")
  val scanRcfile = hql("select key from scan_rcfile")

  val csvDuration = benchmark(scanCsv.count())
  val rcfileDuration = benchmark(scanRcfile.count())

  println(s"CSV: $csvDuration ms, RCFile: $rcfileDuration ms")

  def benchmark(f: => Unit) = {
    val begin = System.currentTimeMillis()
    f
    val end = System.currentTimeMillis()
    end - begin
  }
}
```

@marmbrus Please help review, thanks!

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #758 from liancheng/fastHiveTableScan and squashes the following commits:

4241a19 [Cheng Lian] Distinguishes sorted and possibly not sorted operations more accurately
in HiveComparisonTest
cf640d8 [Cheng Lian] More HiveTableScan optimisations:
bf0e7dc [Cheng Lian] Added SortedOperation pattern to match *some* definitely sorted operations
and avoid some sorting cost in HiveComparisonTest.
6d1c642 [Cheng Lian] Using ColumnProjectionUtils to optimise RCFile and ORC column pruning
eb62fd3 [Cheng Lian] [SPARK-1368] Optimized HiveTableScan

(cherry picked from commit 8f7141fbc015addb314e1d5801085587b5cbb171)
Signed-off-by: Michael Armbrust <michael@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f56aadc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f56aadc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f56aadc

Branch: refs/heads/branch-1.0
Commit: 0f56aadc8d1c34463cee2e234b6250145d866cd7
Parents: 8bb9390
Author: Cheng Lian <lian.cs.zju@gmail.com>
Authored: Thu May 29 15:24:03 2014 -0700
Committer: Michael Armbrust <michael@databricks.com>
Committed: Thu May 29 15:25:53 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Aggregate.scala  |  2 +-
 .../apache/spark/sql/hive/hiveOperators.scala   | 97 ++++++++++++++++----
 .../sql/hive/execution/HiveComparisonTest.scala | 25 ++---
 3 files changed, 96 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f56aadc/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index 36b3b95..604914e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -116,7 +116,7 @@ case class Aggregate(
    */
   @transient
   private[this] lazy val resultMap =
-    (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute} ++ namedGroups).toMap
+    (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap
 
   /**
    * Substituted version of aggregateExpressions expressions which are used to compute final

http://git-wip-us.apache.org/repos/asf/spark/blob/0f56aadc/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index 96faebc..f141139 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -18,15 +18,18 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
 import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
 import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
-import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred._
 
@@ -37,6 +40,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive._
 import org.apache.spark.{TaskContext, SparkException}
+import org.apache.spark.util.MutablePair
 
 /* Implicits */
 import scala.collection.JavaConversions._
@@ -94,7 +98,7 @@ case class HiveTableScan(
         (_: Any, partitionKeys: Array[String]) => {
           val value = partitionKeys(ordinal)
           val dataType = relation.partitionKeys(ordinal).dataType
-          castFromString(value, dataType)
+          unwrapHiveData(castFromString(value, dataType))
         }
       } else {
         val ref = objectInspector.getAllStructFieldRefs
@@ -102,16 +106,55 @@ case class HiveTableScan(
           .getOrElse(sys.error(s"Can't find attribute $a"))
         (row: Any, _: Array[String]) => {
           val data = objectInspector.getStructFieldData(row, ref)
-          unwrapData(data, ref.getFieldObjectInspector)
+          unwrapHiveData(unwrapData(data, ref.getFieldObjectInspector))
         }
       }
     }
   }
 
+  private def unwrapHiveData(value: Any) = value match {
+    case maybeNull: String if maybeNull.toLowerCase == "null" => null
+    case varchar: HiveVarchar => varchar.getValue
+    case decimal: HiveDecimal => BigDecimal(decimal.bigDecimalValue)
+    case other => other
+  }
+
   private def castFromString(value: String, dataType: DataType) = {
     Cast(Literal(value), dataType).eval(null)
   }
 
+  private def addColumnMetadataToConf(hiveConf: HiveConf) {
+    // Specifies IDs and internal names of columns to be scanned.
+    val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name):
Integer)
+    val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
+
+    if (attributes.size == relation.output.size) {
+      ColumnProjectionUtils.setFullyReadColumns(hiveConf)
+    } else {
+      ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
+    }
+
+    ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        relation.tableDesc.getDeserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
+  }
+
+  addColumnMetadataToConf(sc.hiveconf)
+
   @transient
   def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
     hadoopReader.makeRDDForTable(relation.hiveQlTable)
@@ -143,20 +186,42 @@ case class HiveTableScan(
   }
 
   def execute() = {
-    inputRdd.map { row =>
-      val values = row match {
-        case Array(deserializedRow: AnyRef, partitionKeys: Array[String]) =>
-          attributeFunctions.map(_(deserializedRow, partitionKeys))
-        case deserializedRow: AnyRef =>
-          attributeFunctions.map(_(deserializedRow, Array.empty))
+    inputRdd.mapPartitions { iterator =>
+      if (iterator.isEmpty) {
+        Iterator.empty
+      } else {
+        val mutableRow = new GenericMutableRow(attributes.length)
+        val mutablePair = new MutablePair[Any, Array[String]]()
+        val buffered = iterator.buffered
+
+        // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
+        // matching are avoided intentionally.
+        val rowsAndPartitionKeys = buffered.head match {
+          // With partition keys
+          case _: Array[Any] =>
+            buffered.map { case array: Array[Any] =>
+              val deserializedRow = array(0)
+              val partitionKeys = array(1).asInstanceOf[Array[String]]
+              mutablePair.update(deserializedRow, partitionKeys)
+            }
+
+          // Without partition keys
+          case _ =>
+            val emptyPartitionKeys = Array.empty[String]
+            buffered.map { deserializedRow =>
+              mutablePair.update(deserializedRow, emptyPartitionKeys)
+            }
+        }
+
+        rowsAndPartitionKeys.map { pair =>
+          var i = 0
+          while (i < attributes.length) {
+            mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
+            i += 1
+          }
+          mutableRow: Row
+        }
       }
-      buildRow(values.map {
-        case n: String if n.toLowerCase == "null" => null
-        case varchar: org.apache.hadoop.hive.common.`type`.HiveVarchar => varchar.getValue
-        case decimal: org.apache.hadoop.hive.common.`type`.HiveDecimal =>
-          BigDecimal(decimal.bigDecimalValue)
-        case other => other
-      })
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0f56aadc/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index edff38b..1b5a132 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.hive.execution
 
 import java.io._
 
+import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
+
 import org.apache.spark.sql.Logging
-import org.apache.spark.sql.catalyst.plans.logical.{ExplainCommand, NativeCommand}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.Sort
-import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
 import org.apache.spark.sql.hive.test.TestHive
 
 /**
@@ -128,17 +129,19 @@ abstract class HiveComparisonTest
   protected def prepareAnswer(
     hiveQuery: TestHive.type#HiveQLQueryExecution,
     answer: Seq[String]): Seq[String] = {
+
+    def isSorted(plan: LogicalPlan): Boolean = plan match {
+      case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct
=> false
+      case PhysicalOperation(_, _, Sort(_, _)) => true
+      case _ => plan.children.iterator.map(isSorted).exists(_ == true)
+    }
+
     val orderedAnswer = hiveQuery.logical match {
       // Clean out non-deterministic time schema info.
       case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
       case _: ExplainCommand => answer
-      case _ =>
-        // TODO: Really we only care about the final total ordering here...
-        val isOrdered = hiveQuery.executedPlan.collect {
-          case s @ Sort(_, global, _) if global => s
-        }.nonEmpty
-        // If the query results aren't sorted, then sort them to ensure deterministic answers.
-        if (!isOrdered) answer.sorted else answer
+      case plan if isSorted(plan) => answer
+      case _ => answer.sorted
     }
     orderedAnswer.map(cleanPaths)
   }
@@ -161,7 +164,7 @@ abstract class HiveComparisonTest
     "minFileSize"
   )
   protected def nonDeterministicLine(line: String) =
-    nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_)
+    nonDeterministicLineIndicators.exists(line contains _)
 
   /**
    * Removes non-deterministic paths from `str` so cached answers will compare correctly.


Mime
View raw message