spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tejasapatil <...@git.apache.org>
Subject [GitHub] spark pull request #14864: [SPARK-15453] [SQL] FileSourceScanExec to extract...
Date Mon, 29 Aug 2016 16:02:34 GMT
GitHub user tejasapatil opened a pull request:

    https://github.com/apache/spark/pull/14864

    [SPARK-15453] [SQL] FileSourceScanExec to extract `outputOrdering` information

    ## What changes were proposed in this pull request?
    
    Extracting sort ordering information in `FileSourceScanExec` so that planner can make
use of it. My motivation to make this change was to get Sort Merge join in par with Hive's
Sort-Merge-Bucket join when the source tables are bucketed + sorted.
    
    Query:
    
    ```
    val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
    df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
    df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
    context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
    ```
    
    Before:
    
    ```
    == Physical Plan ==
    *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
    :- *Sort [j#120 ASC, k#121 ASC], false, 0
    :  +- *Project [i#119, j#120, k#121]
    :     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
    :        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC,
InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters:
[], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    +- *Sort [j#123 ASC, k#124 ASC], false, 0
    +- *Project [i#122, j#123, k#124]
    +- *Filter (isnotnull(k#124) && isnotnull(j#123))
     +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters:
[IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    ```
    
    After:  (note that the `Sort` step is no longer there)
    
    ```
    == Physical Plan ==
    *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
    :- *Project [i#48, j#49, k#50]
    :  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
    :     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters:
[IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    +- *Project [i#51, j#52, k#53]
       +- *Filter (isnotnull(j#52) && isnotnull(k#53))
          +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters:
[IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
    ```
    
    ## How was this patch tested?
    
    Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tejasapatil/spark SPARK-15453_smb_optimization

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14864
    
----
commit 07196a8acbf6f0a68f29f96d1eeea74f53bbeb8a
Author: Tejas Patil <tejasp@fb.com>
Date:   2016-08-26T07:00:35Z

    [SPARK-15453] [SQL] Sort Merge Join to use bucketing metadata to optimize query plan
    
    BEFORE
    
    ```
    val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
    hc.sql("DROP TABLE table8").collect
    df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table8")
    hc.sql("DROP TABLE table9").collect
    df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table9")
    
    hc.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
    
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
    :- 'UnresolvedRelation table8, a
    +- 'UnresolvedRelation table9, b
    
    == Analyzed Logical Plan ==
    i: int, j: int, k: string, i: int, j: int, k: string
    Project [i#119, j#120, k#121, i#122, j#123, k#124]
    +- Join Inner, ((j#120 = j#123) && (k#121 = k#124))
    :- SubqueryAlias a
    :  +- SubqueryAlias table8
    :     +- Relation[i#119,j#120,k#121] orc
    +- SubqueryAlias b
      +- SubqueryAlias table9
    +- Relation[i#122,j#123,k#124] orc
    
    == Optimized Logical Plan ==
    Join Inner, ((j#120 = j#123) && (k#121 = k#124))
    :- Filter (isnotnull(k#121) && isnotnull(j#120))
    :  +- Relation[i#119,j#120,k#121] orc
    +- Filter (isnotnull(k#124) && isnotnull(j#123))
    +- Relation[i#122,j#123,k#124] orc
    
    == Physical Plan ==
    *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
    :- *Sort [j#120 ASC, k#121 ASC], false, 0
    :  +- *Project [i#119, j#120, k#121]
    :     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
    :        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC,
InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters:
[], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    +- *Sort [j#123 ASC, k#124 ASC], false, 0
    +- *Project [i#122, j#123, k#124]
    +- *Filter (isnotnull(k#124) && isnotnull(j#123))
     +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters:
[IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    ```
    
    AFTER
    
    ```
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
       :- 'UnresolvedRelation `table8`, a
       +- 'UnresolvedRelation `table9`, b
    
    == Analyzed Logical Plan ==
    i: int, j: int, k: string, i: int, j: int, k: string
    Project [i#48, j#49, k#50, i#51, j#52, k#53]
    +- Join Inner, ((j#49 = j#52) && (k#50 = k#53))
       :- SubqueryAlias a
       :  +- SubqueryAlias table8
       :     +- Relation[i#48,j#49,k#50] orc
       +- SubqueryAlias b
          +- SubqueryAlias table9
             +- Relation[i#51,j#52,k#53] orc
    
    == Optimized Logical Plan ==
    Join Inner, ((j#49 = j#52) && (k#50 = k#53))
    :- Filter (isnotnull(k#50) && isnotnull(j#49))
    :  +- Relation[i#48,j#49,k#50] orc
    +- Filter (isnotnull(j#52) && isnotnull(k#53))
       +- Relation[i#51,j#52,k#53] orc
    
    == Physical Plan ==
    *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
    :- *Project [i#48, j#49, k#50]
    :  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
    :     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters:
[IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
    +- *Project [i#51, j#52, k#53]
       +- *Filter (isnotnull(j#52) && isnotnull(k#53))
          +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters:
[IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
    ```

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message