drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4833) Union-All with a small cardinality input on one side does not get parallelized
Date Mon, 08 Aug 2016 22:43:20 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15412590#comment-15412590
] 

ASF GitHub Bot commented on DRILL-4833:
---------------------------------------

Github user jinfengni commented on a diff in the pull request:

    https://github.com/apache/drill/pull/562#discussion_r73969588
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
---
    @@ -72,6 +73,47 @@ public Prel visitScan(ScanPrel prel, MajorFragmentStat s) throws RuntimeExceptio
         return prel;
       }
     
    +  /**
    +   * A union-all should be treated differently compared to a join operator because joins
impose
    +   * a co-location requirement and therefore insert an exchange on both sides of the
    +   * join (e.g HashToRandomExchange or BroadcastExchange), thus the major fragment of
the join itself
    +   * is different from the major fragment of its children.  Union-All does not impose
the co-location
    +   * requirement on its children, hence the major fragment of the union-all may be the
same as that of
    +   * its children. Thus, we should take an 'aggregate' view of all its children to decide
the parallelism.
    +   */
    +  @Override
    +  public Prel visitUnionAll(UnionAllPrel prel, MajorFragmentStat s) throws RuntimeException
{
    +    List<RelNode> children = Lists.newArrayList();
    +    s.add(prel);
    +
    +    List<MajorFragmentStat> statList = Lists.newArrayList();
    +    for (Prel p : prel) {
    +      // for each input of union-all, create a temporary MajorFragmentStat instance
    +      MajorFragmentStat childStat = new MajorFragmentStat(s /* use existing stat to initialize
*/);
    +      statList.add(childStat);
    +      childStat.add(p);
    +    }
    +
    +    int i = 0;
    +    for(Prel p : prel) {
    +      children.add(p.accept(this, statList.get(i++)));
    +    }
    +
    +    MajorFragmentStat maxStat = statList.get(0);
    +    // get the max width of all child stats
    +    for (int j=1; j < statList.size(); j++) {
    +      if (statList.get(j).getMaxWidth() > maxStat.getMaxWidth()) {
    +        maxStat = statList.get(j);
    +      }
    +    }
    +
    +    // width of the major fragment that contains union-all should be the maximum
    +    // width of all its inputs
    +    s.setMaxWidth(maxStat.getMaxWidth());
    --- End diff --
    
    After discussion with @amansinha100 , I realized that maxRows has been taken care of for
Union-All operator, since for union-all, maxRows will be sum of both children's maxRows.



> Union-All with a small cardinality input on one side does not get parallelized
> ------------------------------------------------------------------------------
>
>                 Key: DRILL-4833
>                 URL: https://issues.apache.org/jira/browse/DRILL-4833
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.7.0
>            Reporter: Aman Sinha
>            Assignee: Jinfeng Ni
>
> When a Union-All has an input that is a LIMIT 1 (or some small value relative to the
slice_target), and that input is accessing Parquet files, Drill does an optimization where
a single Parquet file is read (based on the rowcount statistics in the Parquet file, we determine
that reading 1 file is sufficient).  This also means that the max width for that major fragment
is set to 1 because only 1 minor fragment is needed to read 1 row-group. 
> The net effect of this is the width of 1 is applied to the major fragment which consists
of union-all and its inputs.  This is sub-optimal because it prevents parallelization of the
other input and the union-all operator itself.  
> Here's an example query and plan that illustrates the issue: 
> {noformat}
> alter session set `planner.slice_target` = 1;
> explain plan for 
> (select c.c_nationkey, c.c_custkey, c.c_name
> from
> dfs.`/Users/asinha/data/tpchmulti/customer` c
> inner join
> dfs.`/Users/asinha/data/tpchmulti/nation`  n
> on c.c_nationkey = n.n_nationkey)
> union all
> (select c_nationkey, c_custkey, c_name
> from dfs.`/Users/asinha/data/tpchmulti/customer` c limit 1)
> +------+------+
> | text | json |
> +------+------+
> | 00-00    Screen
> 00-01      Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-02        Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-03          UnionAll(all=[true])
> 00-05            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-07              HashJoin(condition=[=($0, $3)], joinType=[inner])
> 00-10                Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-13                  HashToRandomExchange(dist0=[[$0]])
> 01-01                    UnorderedMuxExchange
> 03-01                      Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 03-02                        Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/Users/asinha/data/tpchmulti/customer]], selectionRoot=file:/Users/asinha/data/tpchmulti/customer,
numFiles=1, usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> 00-09                Project(n_nationkey=[$0])
> 00-12                  HashToRandomExchange(dist0=[[$0]])
> 02-01                    UnorderedMuxExchange
> 04-01                      Project(n_nationkey=[$0], E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($0)])
> 04-02                        Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/Users/asinha/data/tpchmulti/nation]], selectionRoot=file:/Users/asinha/data/tpchmulti/nation,
numFiles=1, usedMetadataFile=false, columns=[`n_nationkey`]]])
> 00-04            Project(c_nationkey=[$0], c_custkey=[$1], c_name=[$2])
> 00-06              SelectionVectorRemover
> 00-08                Limit(fetch=[1])
> 00-11                  Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/Users/asinha/data/tpchmulti/customer/01.parquet]],
selectionRoot=file:/Users/asinha/data/tpchmulti/customer, numFiles=1, usedMetadataFile=false,
columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
> {noformat}
> Note that Union-all and HashJoin are part of fragment 0 (single minor fragment) even
though they could have been parallelized.  This clearly affects performance for larger data
sets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message