drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
Date Mon, 12 Feb 2018 17:27:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361120#comment-16361120
] 

ASF GitHub Bot commented on DRILL-6115:
---------------------------------------

Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167589289
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
---
    @@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
         return pw;
       }
     
    +  /**
    +   * This method creates a new UnorderedMux and Demux exchanges if mux operators are
enabled.
    +   * @param child input to the new Unordered[Mux/Demux]Prel or new HashToRandomExchange
node.
    +   * @param options options manager to check if mux is enabled.
    +   */
    +  @Override
    +  public Prel getMuxPrel(Prel child, OptionManager options) {
    +    boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
    +    Prel newPrel = child;
    +
    +    final List<String> childFields = child.getRowType().getFieldNames();
    +
    +    List <RexNode> removeUpdatedExpr = null;
    +
    +    if (isMuxEnabled) {
    +      // Insert Project Operator with new column that will be a hash for HashToRandomExchange
fields
    +      final List<DistributionField> distFields = getFields();
    +      final List<String> outputFieldNames = Lists.newArrayList(childFields);
    +      final RexBuilder rexBuilder = getCluster().getRexBuilder();
    +      final List<RelDataTypeField> childRowTypeFields = child.getRowType().getFieldList();
    +
    +      final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper = new
HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
    +      final List<RexNode> distFieldRefs = Lists.newArrayListWithExpectedSize(distFields.size());
    +      for(int i=0; i<distFields.size(); i++) {
    +        final int fieldId = distFields.get(i).getFieldId();
    +        distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(),
fieldId));
    +      }
    +
    +      final List <RexNode> updatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
    +      removeUpdatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
    +      for ( RelDataTypeField field : childRowTypeFields) {
    +        RexNode rex = rexBuilder.makeInputRef(field.getType(), field.getIndex());
    +        updatedExpr.add(rex);
    +        removeUpdatedExpr.add(rex);
    +      }
    +
    +      outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
    +      final RexNode distSeed = rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED));
// distribution seed
    +      updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs,
distSeed, hashHelper));
    +
    +      RelDataType rowType = RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr,
outputFieldNames);
    +
    +      ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), child.getTraitSet(),
child, updatedExpr, rowType);
    +
    +      newPrel = new UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), addColumnprojectPrel.getTraitSet(),
    +              addColumnprojectPrel);
    +    }
    +
    +    newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(), newPrel, getFields());
    +
    +    if (options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
    +      HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel;
    +      // Insert a DeMuxExchange to narrow down the number of receivers
    +      newPrel = new UnorderedDeMuxExchangePrel(getCluster(), getTraitSet(), hashExchangePrel,
    +              hashExchangePrel.getFields());
    +    }
    +
    +    if ( isMuxEnabled ) {
    --- End diff --
    
    Use consistent formating.


> SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
> ------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6115
>                 URL: https://issues.apache.org/jira/browse/DRILL-6115
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.12.0
>            Reporter: Hanumath Rao Maduri
>            Assignee: Hanumath Rao Maduri
>            Priority: Major
>             Fix For: 1.13.0
>
>         Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. The following
query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from dfs.`/drill/tables/lineitem`
order by L_LINENUMBER;
> +------+------+
> | text | json |
> +------+------+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec
[tableName=maprfs:///drill/tables/lineitem, condition=null], columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor fragments which
are all merged on a single node with one merge receiver. Doing so will create lot of memory
pressure on the receiver node and also execution bottleneck. To address this issue, merge
receiver should be multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can be done parallel.
But as a first step I think it is better to use the existing infrastructure for multiplexing
operators to generate an OrderedMux so that all the minor fragments pertaining to one DRILLBIT
should be merged and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which is parallel

> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message