spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tejasapatil <...@git.apache.org>
Subject [GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Date Tue, 22 Nov 2016 00:26:02 GMT
Github user tejasapatil commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15951#discussion_r89015376
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
---
    @@ -84,30 +84,95 @@ case class DataSource(
       private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     
       /**
    -   * Infer the schema of the given FileFormat, returns a pair of schema and partition
column names.
    +   * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or
try to infer
    +   * it. In the read path, only Hive managed tables provide the partition columns properly
when
    +   * initializing this class. All other file based data sources will try to infer the
partitioning,
    +   * and then cast the inferred types to user specified dataTypes if the partition columns
exist
    +   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510.
    +   * This method will try to skip file scanning whether `userSpecifiedSchema` and
    +   * `partitionColumns` are provided. Here are some code paths that use this method:
    +   *   1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning
columns
    +   *   2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast
them to the
    +   *     dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred
    +   *     dataType if they don't.
    +   *   3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users
have to
    +   *     provide the schema. Here, we also perform partition inference like 2, and try
to use
    +   *     dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream
will re-use
    +   *     this information, therefore calls to this method should be very cheap, i.e.
there won't
    +   *     be any further inference in any triggers.
    +   *   4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve
the
    +   *     existing table's partitioning scheme. This is achieved by not providing
    +   *     `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning`
for an early
    +   *     exit, if we don't care about the schema of the original table.
    +   *
    +   * Returns a pair of the data schema (excluding partition columns) and the schema of
the partition
    +   * columns.
    +   *
    +   * @param justPartitioning Whether to exit early and provide just the schema partitioning.
The
    +   *                         data schema is incorrect in this case and should not be
used.
    --- End diff --
    
    This feels ugly. A client can accidentally end up using the data schema. Ideally, these
could be separate methods for data schema and partition schema. 
    
    In case thats not possible, how about returning `None` instead of returning `new StructType()`
?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message