drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
Date Mon, 12 Feb 2018 17:27:00 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361121#comment-16361121
] 

ASF GitHub Bot commented on DRILL-6115:
---------------------------------------

Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167616026
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
---
    @@ -0,0 +1,64 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
    +
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering>
orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +    if (senders == null || senders.size() <= 0) {
    +      throw new IllegalStateException(String.format("Failed to find senders for receiver
[%d]", minorFragmentId));
    +    }
    +
    +    if (logger.isDebugEnabled()) {
    +      logger.debug(String.format("Minor fragment %d, receives data from following senders
%s", minorFragmentId, senders));
    +    }
    +
    +    return new MergingReceiverPOP(senderMajorFragmentId, senders, orderings, false);
    --- End diff --
    
    I don't think that locality plays a role in enabling/disabling spooling. If spooling was
disabled for a remote receiver, it should be also disabled for a local receiver.


> SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
> ------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6115
>                 URL: https://issues.apache.org/jira/browse/DRILL-6115
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.12.0
>            Reporter: Hanumath Rao Maduri
>            Assignee: Hanumath Rao Maduri
>            Priority: Major
>             Fix For: 1.13.0
>
>         Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. The following
query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from dfs.`/drill/tables/lineitem`
order by L_LINENUMBER;
> +------+------+
> | text | json |
> +------+------+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec
[tableName=maprfs:///drill/tables/lineitem, condition=null], columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor fragments which
are all merged on a single node with one merge receiver. Doing so will create lot of memory
pressure on the receiver node and also execution bottleneck. To address this issue, merge
receiver should be multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can be done parallel.
But as a first step I think it is better to use the existing infrastructure for multiplexing
operators to generate an OrderedMux so that all the minor fragments pertaining to one DRILLBIT
should be merged and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which is parallel

> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message