drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
Date Sun, 11 Feb 2018 20:38:02 GMT

    [ https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360109#comment-16360109
] 

ASF GitHub Bot commented on DRILL-6115:
---------------------------------------

Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167447863
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
---
    @@ -0,0 +1,116 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl;
    +
    +import org.apache.drill.PlanTestBase;
    +import org.apache.drill.common.expression.ExpressionPosition;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.rpc.user.QueryDataBatch;
    +import org.apache.drill.exec.vector.IntVector;
    +import org.apache.drill.test.ClusterFixture;
    +import org.apache.drill.test.ClusterFixtureBuilder;
    +import org.apache.drill.test.ClientFixture;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestOrderedMuxExchange extends PlanTestBase {
    +
    +  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
    +
    +
    +  private void validateResults(BufferAllocator allocator, List<QueryDataBatch>
results) throws SchemaChangeException {
    +    long previousBigInt = Long.MIN_VALUE;
    +
    +    for (QueryDataBatch b : results) {
    +      RecordBatchLoader loader = new RecordBatchLoader(allocator);
    +      if (b.getHeader().getRowCount() > 0) {
    +        loader.load(b.getHeader().getDef(),b.getData());
    +        @SuppressWarnings({ "deprecation", "resource" })
    +        IntVector c1 = (IntVector) loader.getValueAccessorById(IntVector.class,
    +                   loader.getValueVectorId(new SchemaPath("id_i", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
    +        IntVector.Accessor a1 = c1.getAccessor();
    +
    +        for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
    +          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt
<= a1.get(i));
    +          previousBigInt = a1.get(i);
    +        }
    +      }
    +      loader.clear();
    +      b.release();
    +    }
    +  }
    +
    +  /**
    +   * Test case to verify the OrderedMuxExchange created for order by clause.
    +   * It checks by forcing the plan to create OrderedMuxExchange and also verifies the
    +   * output column is ordered.
    +   *
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  @Test
    +  public void testOrderedMuxForOrderBy() throws Exception {
    +    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
    +            .maxParallelization(1)
    +            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
    +            ;
    +
    +    try (ClusterFixture cluster = builder.build();
    +         ClientFixture client = cluster.clientFixture()) {
    +      client.alterSession(ExecConstants.SLICE_TARGET, 10);
    +      String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` ORDER BY id_i limit
10";
    --- End diff --
    
    I am not sure how the table is organized..does it have already ordered id_i column ? if
so, we should use a different column. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.
> ------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6115
>                 URL: https://issues.apache.org/jira/browse/DRILL-6115
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.12.0
>            Reporter: Hanumath Rao Maduri
>            Assignee: Hanumath Rao Maduri
>            Priority: Major
>             Fix For: 1.13.0
>
>         Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. The following
query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from dfs.`/drill/tables/lineitem`
order by L_LINENUMBER;
> +------+------+
> | text | json |
> +------+------+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec
[tableName=maprfs:///drill/tables/lineitem, condition=null], columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor fragments which
are all merged on a single node with one merge receiver. Doing so will create lot of memory
pressure on the receiver node and also execution bottleneck. To address this issue, merge
receiver should be multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can be done parallel.
But as a first step I think it is better to use the existing infrastructure for multiplexing
operators to generate an OrderedMux so that all the minor fragments pertaining to one DRILLBIT
should be merged and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which is parallel

> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message