spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiao Li (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-18539) Cannot filter by nonexisting column in parquet file
Date Mon, 05 Dec 2016 07:32:59 GMT

    [ https://issues.apache.org/jira/browse/SPARK-18539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15721481#comment-15721481
] 

Xiao Li commented on SPARK-18539:
---------------------------------

The default of `spark.sql.parquet.mergeSchema` is false. To figure out the schema of parquet,
the default behavior is based on a much cheaper solution

{noformat}
    // Always tries the summary files first if users don't require a merged schema.  In this
case,
    // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row
    // groups information, and could be much smaller for large Parquet files with lots of
row
    // groups.  If no summary file is available, falls back to some random part-file.
{noformat}

It might not always resolve your case.

If we introduce such a parameter to always infer the schema and compare the inferred schema
with user-specified schemas, we can do extra checking and processing on the schema (e.g.,
type promotion, schema merging between user-specified schema and actual data schema). The
scope will be much bigger. This is a design decision. cc [~rxin] [~marmbrus] [~lian cheng]
[~cloud_fan] [~ekhliang]





> Cannot filter by nonexisting column in parquet file
> ---------------------------------------------------
>
>                 Key: SPARK-18539
>                 URL: https://issues.apache.org/jira/browse/SPARK-18539
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.1, 2.0.2
>            Reporter: Vitaly Gerasimov
>            Priority: Critical
>
> {code}
>   import org.apache.spark.SparkConf
>   import org.apache.spark.sql.SparkSession
>   import org.apache.spark.sql.types.DataTypes._
>   import org.apache.spark.sql.types.{StructField, StructType}
>   val sc = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
>   val jsonRDD = sc.sparkContext.parallelize(Seq("""{"a":1}"""))
>   sc.read
>     .schema(StructType(Seq(StructField("a", IntegerType))))
>     .json(jsonRDD)
>     .write
>     .parquet("/tmp/test")
>   sc.read
>     .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType,
nullable = true))))
>     .load("/tmp/test")
>     .createOrReplaceTempView("table")
>   sc.sql("select b from table where b is not null").show()
> {code}
> returns:
> {code}
> 16/11/22 17:43:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalArgumentException: Column [b] was not found in schema!
> 	at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:55)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:190)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:178)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:160)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:100)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
> 	at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:194)
> 	at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
> 	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:59)
> 	at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:40)
> 	at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:126)
> 	at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:46)
> 	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:110)
> 	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:109)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:367)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReader$1.apply(ParquetFileFormat.scala:341)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:116)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:86)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> expected result:
> {code}
> +---+
> |  b|
> +---+
> +---+
> {code}
> It works fine in 2.0.0 and 1.6.2. However, if I only select the nonexisting column (without
filter) it also works fine.
> Query plan:
> {code}
> == Parsed Logical Plan ==
> 'Project ['b]
> +- 'Filter isnotnull('b)
>    +- 'UnresolvedRelation `table`
> == Analyzed Logical Plan ==
> b: int
> Project [b#8]
> +- Filter isnotnull(b#8)
>    +- SubqueryAlias table
>       +- Relation[a#7,b#8] parquet
> == Optimized Logical Plan ==
> Project [b#8]
> +- Filter isnotnull(b#8)
>    +- Relation[a#7,b#8] parquet
> == Physical Plan ==
> *Project [b#8]
> +- *Filter isnotnull(b#8)
>    +- *BatchedScan parquet [b#8] Format: ParquetFormat, InputPaths: file:/tmp/test, PartitionFilters:
[], PushedFilters: [IsNotNull(b)], ReadSchema: struct<b:int>
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message