spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL] fix RowEncoder and parquet reader for wide table
Date Wed, 06 Apr 2016 22:33:45 GMT
Repository: spark
Updated Branches:
  refs/heads/master a4ead6d38 -> 5a4b11a90


[SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL] fix RowEncoder and parquet reader for wide
table

## What changes were proposed in this pull request?

1) fix the RowEncoder for wide table (many columns) by splitting the generate code into multiple
functions.
2) Separate DataSourceScan as RowDataSourceScan and BatchedDataSourceScan
3) Disable the returning columnar batch in parquet reader if there are many columns.
4) Added a internal config for maximum number of fields (nested) columns supported by whole
stage codegen.

Closes #12098

## How was this patch tested?

Add a tests for table with 1000 columns.

Author: Davies Liu <davies@databricks.com>

Closes #12047 from davies/many_columns.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a4b11a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a4b11a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a4b11a9

Branch: refs/heads/master
Commit: 5a4b11a901703464b9261dea0642d80cf8d4856c
Parents: a4ead6d
Author: Davies Liu <davies@databricks.com>
Authored: Wed Apr 6 15:33:39 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Wed Apr 6 15:33:39 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/objects.scala      |  24 +-
 .../parquet/VectorizedParquetRecordReader.java  |  19 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +-
 .../spark/sql/execution/ExistingRDD.scala       | 291 ++++++++++---------
 .../spark/sql/execution/WholeStageCodegen.scala |  13 +-
 .../datasources/DataSourceStrategy.scala        |   6 +-
 .../datasources/FileSourceStrategy.scala        |   2 +-
 .../execution/datasources/SqlNewHadoopRDD.scala |  34 +--
 .../datasources/parquet/ParquetRelation.scala   |  77 +++--
 .../org/apache/spark/sql/internal/SQLConf.scala |  11 +
 .../apache/spark/sql/sources/interfaces.scala   |   9 +
 .../datasources/FileSourceStrategySuite.scala   |   3 +-
 .../datasources/parquet/ParquetQuerySuite.scala |  10 +
 13 files changed, 267 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
index a0490e1..28b6b2a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
@@ -524,22 +524,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
   override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
     val rowClass = classOf[GenericRowWithSchema].getName
     val values = ctx.freshName("values")
-    val schemaField = ctx.addReferenceObj("schema", schema)
-    s"""
-      boolean ${ev.isNull} = false;
-      final Object[] $values = new Object[${children.size}];
-    """ +
-      children.zipWithIndex.map { case (e, i) =>
-        val eval = e.gen(ctx)
-        eval.code + s"""
+    ctx.addMutableState("Object[]", values, "")
+
+    val childrenCodes = children.zipWithIndex.map { case (e, i) =>
+      val eval = e.gen(ctx)
+      eval.code + s"""
           if (${eval.isNull}) {
             $values[$i] = null;
           } else {
             $values[$i] = ${eval.value};
           }
          """
-      }.mkString("\n") +
-      s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);"
+    }
+    val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes)
+    val schemaField = ctx.addReferenceObj("schema", schema)
+    s"""
+      boolean ${ev.isNull} = false;
+      $values = new Object[${children.size}];
+      $childrenCode
+      final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);
+      """
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index a0b6276..51bdf0f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -31,7 +31,8 @@ import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using
the
@@ -100,20 +101,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
 
   /**
-   * Tries to initialize the reader for this split. Returns true if this reader supports
reading
-   * this split and false otherwise.
-   */
-  public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    try {
-      initialize(inputSplit, taskAttemptContext);
-      return true;
-    } catch (UnsupportedOperationException e) {
-      return false;
-    }
-  }
-
-  /**
    * Implementation of RecordReader API.
    */
   @Override
@@ -222,7 +209,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     return columnarBatch;
   }
 
-  /**
+  /*
    * Can be called before any rows are returned to enable returning columnar batches directly.
    */
   public void enableReturningBatches() {

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c9cb79..9259ff4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -120,7 +120,7 @@ class SQLContext private[sql](
    */
   @transient
   protected[sql] lazy val sessionState: SessionState = new SessionState(self)
-  protected[sql] def conf: SQLConf = sessionState.conf
+  protected[spark] def conf: SQLConf = sessionState.conf
 
   /**
    * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ab575e9..392c48f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.toCommentSafeString
 import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{AtomicType, DataType}
 
 object RDDConversions {
   def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow]
= {
@@ -123,28 +123,30 @@ private[sql] case class PhysicalRDD(
   }
 }
 
-/** Physical plan node for scanning data from a relation. */
-private[sql] case class DataSourceScan(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val metadata: Map[String, String] = Map.empty)
-  extends LeafNode with CodegenSupport {
+private[sql] trait DataSourceScan extends LeafNode {
+  val rdd: RDD[InternalRow]
+  val relation: BaseRelation
 
   override val nodeName: String = relation.toString
 
   // Ignore rdd when checking results
-  override def sameResult(plan: SparkPlan ): Boolean = plan match {
+  override def sameResult(plan: SparkPlan): Boolean = plan match {
     case other: DataSourceScan => relation == other.relation && metadata == other.metadata
     case _ => false
   }
+}
 
-  private[sql] override lazy val metrics = if (canProcessBatches()) {
-    Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output
rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-  } else {
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class RowDataSourceScan(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String] = Map.empty)
+  extends DataSourceScan with CodegenSupport {
+
+  private[sql] override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output
rows"))
-  }
 
   val outputUnsafeRows = relation match {
     case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -153,38 +155,6 @@ private[sql] case class DataSourceScan(
     case _ => false
   }
 
-  override val outputPartitioning = {
-    val bucketSpec = relation match {
-      // TODO: this should be closer to bucket planning.
-      case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
-      case _ => None
-    }
-
-    def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse
{
-      throw new AnalysisException(s"bucket column $colName not found in existing columns
" +
-        s"(${output.map(_.name).mkString(", ")})")
-    }
-
-    bucketSpec.map { spec =>
-      val numBuckets = spec.numBuckets
-      val bucketColumns = spec.bucketColumnNames.map(toAttribute)
-      HashPartitioning(bucketColumns, numBuckets)
-    }.getOrElse {
-      UnknownPartitioning(0)
-    }
-  }
-
-  private def canProcessBatches(): Boolean = {
-    relation match {
-      case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] &&
-        SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
&&
-        SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) =>
-        true
-      case _ =>
-        false
-    }
-  }
-
   protected override def doExecute(): RDD[InternalRow] = {
     val unsafeRow = if (outputUnsafeRows) {
       rdd
@@ -211,6 +181,57 @@ private[sql] case class DataSourceScan(
     rdd :: Nil
   }
 
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    // PhysicalRDD always just has one input
+    val input = ctx.freshName("input")
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+    val exprRows = output.zipWithIndex.map{ case (a, i) =>
+      new BoundReference(i, a.dataType, a.nullable)
+    }
+    val row = ctx.freshName("row")
+    ctx.INPUT_ROW = row
+    ctx.currentVars = null
+    val columnsRowInput = exprRows.map(_.gen(ctx))
+    val inputRow = if (outputUnsafeRows) row else null
+    s"""
+       |while ($input.hasNext()) {
+       |  InternalRow $row = (InternalRow) $input.next();
+       |  $numOutputRows.add(1);
+       |  ${consume(ctx, columnsRowInput, inputRow).trim}
+       |  if (shouldStop()) return;
+       |}
+     """.stripMargin
+  }
+}
+
+/** Physical plan node for scanning data from a batched relation. */
+private[sql] case class BatchedDataSourceScan(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String] = Map.empty)
+  extends DataSourceScan with CodegenSupport {
+
+  private[sql] override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output
rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+    val metadataStr = metadataEntries.mkString(" ", ", ", "")
+    s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+  }
+
+  override def upstreams(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
   private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
     dataType: DataType, nullable: Boolean): ExprCode = {
     val javaType = ctx.javaType(dataType)
@@ -232,113 +253,65 @@ private[sql] case class DataSourceScan(
   // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
   // never requires UnsafeRow as input.
   override protected def doProduce(ctx: CodegenContext): String = {
-    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
-    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
     val input = ctx.freshName("input")
-    val idx = ctx.freshName("batchIdx")
-    val rowidx = ctx.freshName("rowIdx")
-    val batch = ctx.freshName("batch")
     // PhysicalRDD always just has one input
     ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
     ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
     ctx.addMutableState("int", idx, s"$idx = 0;")
     val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
     val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
       ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = ${batch}.column($i);" }
-
-    val row = ctx.freshName("row")
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
+      s"$name = $batch.column($i);"
+    }
 
-    // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine
this
-    // by looking at the first value of the RDD and then calling the function which will
process
-    // the remaining. It is faster to return batches.
-    // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to
know
-    // here which path to use. Fix this.
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
 
-    val exprRows =
-        output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
-    ctx.INPUT_ROW = row
     ctx.currentVars = null
-    val columnsRowInput = exprRows.map(_.gen(ctx))
-    val inputRow = if (outputUnsafeRows) row else null
-    val scanRows = ctx.freshName("processRows")
-    ctx.addNewFunction(scanRows,
-      s"""
-         | private void $scanRows(InternalRow $row) throws java.io.IOException {
-         |   boolean firstRow = true;
-         |   while (!shouldStop() && (firstRow || $input.hasNext())) {
-         |     if (firstRow) {
-         |       firstRow = false;
-         |     } else {
-         |       $row = (InternalRow) $input.next();
-         |     }
-         |     $numOutputRows.add(1);
-         |     ${consume(ctx, columnsRowInput, inputRow).trim}
-         |   }
-         | }""".stripMargin)
-
-    // Timers for how long we spent inside the scan. We can only maintain this when using
batches,
-    // otherwise the overhead is too high.
-    if (canProcessBatches()) {
-      val scanTimeMetric = metricTerm(ctx, "scanTime")
-      val getBatchStart = ctx.freshName("scanStart")
-      val scanTimeTotalNs = ctx.freshName("scanTime")
-      ctx.currentVars = null
-      val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-        genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
-      val scanBatches = ctx.freshName("processBatches")
-      ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
-      ctx.addNewFunction(scanBatches,
-        s"""
-        | private void $scanBatches() throws java.io.IOException {
-        |  while (true) {
-        |     int numRows = $batch.numRows();
-        |     if ($idx == 0) {
-        |       ${columnAssigns.mkString("", "\n", "\n")}
-        |       $numOutputRows.add(numRows);
-        |     }
-        |
-        |     while (!shouldStop() && $idx < numRows) {
-        |       int $rowidx = $idx++;
-        |       ${consume(ctx, columnsBatchInput).trim}
-        |     }
-        |     if (shouldStop()) return;
-        |
-        |     long $getBatchStart = System.nanoTime();
-        |     if (!$input.hasNext()) {
-        |       $batch = null;
-        |       $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-        |       break;
-        |     }
-        |     $batch = ($columnarBatchClz)$input.next();
-        |     $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
-        |     $idx = 0;
-        |   }
-        | }""".stripMargin)
-
-      val value = ctx.freshName("value")
-      s"""
-         | if ($batch != null) {
-         |   $scanBatches();
-         | } else if ($input.hasNext()) {
-         |   Object $value = $input.next();
-         |   if ($value instanceof $columnarBatchClz) {
-         |     $batch = ($columnarBatchClz)$value;
-         |     $scanBatches();
-         |   } else {
-         |     $scanRows((InternalRow) $value);
-         |   }
-         | }
-       """.stripMargin
-    } else {
-      s"""
-         |if ($input.hasNext()) {
-         |  $scanRows((InternalRow) $input.next());
-         |}
-       """.stripMargin
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
     }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
   }
 }
 
@@ -346,4 +319,38 @@ private[sql] object DataSourceScan {
   // Metadata keys
   val INPUT_PATHS = "InputPaths"
   val PUSHED_FILTERS = "PushedFilters"
+
+  def create(
+      output: Seq[Attribute],
+      rdd: RDD[InternalRow],
+      relation: BaseRelation,
+      metadata: Map[String, String] = Map.empty): DataSourceScan = {
+    val outputPartitioning = {
+      val bucketSpec = relation match {
+        // TODO: this should be closer to bucket planning.
+        case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
+        case _ => None
+      }
+
+      def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse
{
+        throw new AnalysisException(s"bucket column $colName not found in existing columns
" +
+          s"(${output.map(_.name).mkString(", ")})")
+      }
+
+      bucketSpec.map { spec =>
+        val numBuckets = spec.numBuckets
+        val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+        HashPartitioning(bucketColumns, numBuckets)
+      }.getOrElse {
+        UnknownPartitioning(0)
+      }
+    }
+
+    relation match {
+      case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema)
=>
+        BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+      case _ =>
+        RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 4e75a3a..98129d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.aggregate.TungstenAggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
 import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 
 /**
  * An interface for those physical operators that support codegen.
@@ -433,12 +434,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan]
{
     case _ => true
   }
 
+  private def numOfNestedFields(dataType: DataType): Int = dataType match {
+    case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
+    case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
+    case a: ArrayType => numOfNestedFields(a.elementType)
+    case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
+    case _ => 1
+  }
+
   private def supportCodegen(plan: SparkPlan): Boolean = plan match {
     case plan: CodegenSupport if plan.supportCodegen =>
       val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
       // the generated code will be huge if there are too many columns
-      val haveManyColumns = plan.output.length > 200
-      !willFallback && !haveManyColumns
+      val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+      !willFallback && !haveTooManyFields
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 52c8f3e..8c18331 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -238,7 +238,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       }
 
     case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
-      execution.DataSourceScan(
+      execution.DataSourceScan.create(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
 
     case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -610,7 +610,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         // Don't request columns that are only referenced by pushed filters.
         .filterNot(handledSet.contains)
 
-      val scan = execution.DataSourceScan(
+      val scan = execution.DataSourceScan.create(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)
@@ -620,7 +620,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       val requestedColumns =
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
 
-      val scan = execution.DataSourceScan(
+      val scan = execution.DataSourceScan.create(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 618d5a5..aa1f764 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -181,7 +181,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
       }
 
       val scan =
-        DataSourceScan(
+        DataSourceScan.create(
           readDataColumns ++ partitionColumns,
           new FileScanRDD(
             files.sqlContext,

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 159fdc9..6ddb218 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -97,13 +97,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
 
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
-  // If true, enable using the custom RecordReader for parquet. This only works for
-  // a subset of the types (no complex types).
-  protected val enableVectorizedParquetReader: Boolean =
-    sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
-  protected val enableWholestageCodegen: Boolean =
-    sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
-
   override def getPartitions: Array[SparkPartition] = {
     val conf = getConf(isDriverSide = true)
     val inputFormat = inputFormatClass.newInstance
@@ -165,32 +158,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       }
       val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
       val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-      private[this] var reader: RecordReader[Void, V] = null
-
-      /**
-       * If the format is ParquetInputFormat, try to create the optimized RecordReader. If
this
-       * fails (for example, unsupported schema), try with the normal reader.
-       * TODO: plumb this through a different way?
-       */
-      if (enableVectorizedParquetReader &&
-        format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
-        val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
-        if (!parquetReader.tryInitialize(
-            split.serializableHadoopSplit.value, hadoopAttemptContext)) {
-          parquetReader.close()
-        } else {
-          reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
-          parquetReader.resultBatch()
-          // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-          if (enableWholestageCodegen) parquetReader.enableReturningBatches()
-        }
-      }
-
-      if (reader == null) {
-        reader = format.createRecordReader(
+      private[this] var reader: RecordReader[Void, V] = format.createRecordReader(
           split.serializableHadoopSplit.value, hadoopAttemptContext)
-        reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
-      }
+      reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
       // Register an on-task-completion callback to close the input stream.
       context.addTaskCompletionListener(context => close())

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5b58fa1..a2fd8da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -24,7 +24,6 @@ import java.util.logging.{Logger => JLogger}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Try}
-import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -53,7 +52,7 @@ import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.util.collection.BitSet
 
@@ -276,6 +275,16 @@ private[sql] class DefaultSource
         file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * Returns whether the reader will the rows as batch or not.
+   */
+  override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = {
+    val conf = SQLContext.getActive().get.conf
+    conf.useFileScan && conf.parquetVectorizedReaderEnabled &&
+      conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields &&
+      schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
   override def buildReader(
       sqlContext: SQLContext,
       dataSchema: StructType,
@@ -306,6 +315,10 @@ private[sql] class DefaultSource
       SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
       sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
 
+    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+    val returningBatch =
+      supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields))
+
     // Try to push down filters when filter push-down is enabled.
     val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean)
{
       filters
@@ -324,10 +337,8 @@ private[sql] class DefaultSource
     // TODO: if you move this into the closure it reverts to the default values.
     // If true, enable using the custom RecordReader for parquet. This only works for
     // a subset of the types (no complex types).
-    val enableVectorizedParquetReader: Boolean =
-      sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
-    val enableWholestageCodegen: Boolean =
-      sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
+    val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled
&&
+          dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
 
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
@@ -347,32 +358,27 @@ private[sql] class DefaultSource
       val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
       val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value,
attemptId)
 
-      val parquetReader = try {
-        if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
+      val parquetReader = if (enableVectorizedParquetReader) {
         val vectorizedReader = new VectorizedParquetRecordReader()
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-        // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-        // TODO: fix column appending
-        if (enableWholestageCodegen) {
-          logDebug(s"Enabling batch returning")
+        if (returningBatch) {
           vectorizedReader.enableReturningBatches()
         }
         vectorizedReader
-      } catch {
-        case NonFatal(e) =>
-          logDebug(s"Falling back to parquet-mr: $e", e)
-          val reader = pushed match {
-            case Some(filter) =>
-              new ParquetRecordReader[InternalRow](
-                new CatalystReadSupport,
-                FilterCompat.get(filter, null))
-            case _ =>
-              new ParquetRecordReader[InternalRow](new CatalystReadSupport)
-          }
-          reader.initialize(split, hadoopAttemptContext)
-          reader
+      } else {
+        logDebug(s"Falling back to parquet-mr")
+        val reader = pushed match {
+          case Some(filter) =>
+            new ParquetRecordReader[InternalRow](
+              new CatalystReadSupport,
+              FilterCompat.get(filter, null))
+          case _ =>
+            new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+        }
+        reader.initialize(split, hadoopAttemptContext)
+        reader
       }
 
       val iter = new RecordReaderIterator(parquetReader)
@@ -432,13 +438,21 @@ private[sql] class DefaultSource
     val setInputPaths =
       ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
 
+    val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled
+      && allPrimitiveTypes) {
+      classOf[VectorizedParquetInputFormat]
+    } else {
+      classOf[ParquetInputFormat[InternalRow]]
+    }
+
     Utils.withDummyCallSite(sqlContext.sparkContext) {
       new SqlNewHadoopRDD(
         sqlContext = sqlContext,
         broadcastedConf = broadcastedConf,
         initDriverSideJobFuncOpt = Some(setInputPaths),
         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
-        inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
+        inputFormatClass = inputFormatCls,
         valueClass = classOf[InternalRow]) {
 
         val cacheMetadata = useMetadataCache
@@ -481,6 +495,17 @@ private[sql] class DefaultSource
   }
 }
 
+/**
+ * The ParquetInputFormat that create VectorizedParquetRecordReader.
+ */
+final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] {
+  override def createRecordReader(
+    inputSplit: InputSplit,
+    taskAttemptContext: TaskAttemptContext): ParquetRecordReader[InternalRow] = {
+    new VectorizedParquetRecordReader().asInstanceOf[ParquetRecordReader[InternalRow]]
+  }
+}
+
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
 private[sql] class ParquetOutputWriter(
     path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 927af89..dc6ba1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -396,6 +396,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields")
+    .internal()
+    .doc("The maximum number of fields (including nested fields) that will be supported before"
+
+      " deactivating whole-stage codegen.")
+    .intConf
+    .createWithDefault(200)
+
   val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
     .doc("The maximum number of bytes to pack into a single partition when reading files.")
     .longConf
@@ -480,6 +487,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
 
   def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
 
+  def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
+
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
@@ -504,6 +513,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with
Logging {
 
   def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
 
+  def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
+
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
   def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 14e1471..6acb41d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -469,6 +469,15 @@ trait FileFormat {
       options: Map[String, String]): RDD[InternalRow]
 
   /**
+   * Returns whether this format support returning columnar batch or not.
+   *
+   * TODO: we should just have different traits for the different formats.
+   */
+  def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = {
+    false
+  }
+
+  /**
    * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
    *
    * @param dataSchema The global data schema. It can be either specified by the user, or

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4446a68..41f536f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -279,7 +279,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext
with Predi
   /** Plans the query and calls the provided validation function with the planned partitioning.
*/
   def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
     val fileScan = df.queryExecution.executedPlan.collect {
-      case DataSourceScan(_, scan: FileScanRDD, _, _) => scan
+      case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+        scan.rdd.asInstanceOf[FileScanRDD]
     }.headOption.getOrElse {
       fail(s"No FileScan in query\n${df.queryExecution}")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a4b11a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 2f806eb..7d206e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -579,6 +579,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
 
     assert(CatalystReadSupport.expandUDT(schema) === expected)
   }
+
+  test("read/write wide table") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")}
: _*)
+      df.write.mode(SaveMode.Overwrite).parquet(path)
+      checkAnswer(sqlContext.read.parquet(path), df)
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message