drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4706) Fragment planning causes Drillbits to read remote chunks when local copies are available
Date Fri, 04 Nov 2016 23:37:58 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638058#comment-15638058
] 

ASF GitHub Bot commented on DRILL-4706:
---------------------------------------

Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/639#discussion_r86602221
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java
---
    @@ -0,0 +1,476 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableMap;
    +import mockit.Mocked;
    +import mockit.NonStrictExpectations;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.junit.Test;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Collections;
    +
    +import static java.lang.Integer.MAX_VALUE;
    +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT;
    +import static org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +
    +
    +public class TestLocalAffinityFragmentParallelizer {
    +
    +    // Create a set of test endpoints
    +    private static final DrillbitEndpoint DEP1 = newDrillbitEndpoint("node1", 30010);
    +    private static final DrillbitEndpoint DEP2 = newDrillbitEndpoint("node2", 30010);
    +    private static final DrillbitEndpoint DEP3 = newDrillbitEndpoint("node3", 30010);
    +    private static final DrillbitEndpoint DEP4 = newDrillbitEndpoint("node4", 30010);
    +    private static final DrillbitEndpoint DEP5 = newDrillbitEndpoint("node5", 30010);
    +
    +    @Mocked private Fragment fragment;
    +    @Mocked private PhysicalOperator root;
    +
    +    private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port)
{
    +        return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
    +    }
    +
    +    private static final ParallelizationParameters newParameters(final long threshold,
final int maxWidthPerNode,
    +                                                                 final int maxGlobalWidth)
{
    +        return new ParallelizationParameters() {
    +            @Override
    +            public long getSliceTarget() {
    +                return threshold;
    +            }
    +
    +            @Override
    +            public int getMaxWidthPerNode() {
    +                return maxWidthPerNode;
    +            }
    +
    +            @Override
    +            public int getMaxGlobalWidth() {
    +                return maxGlobalWidth;
    +            }
    +
    +            /**
    +             * {@link LocalAffinityFragmentParallelizer} doesn't use affinity factor.
    +             * @return
    +             */
    +            @Override
    +            public double getAffinityFactor() {
    +                return 0.0f;
    +            }
    +        };
    +    }
    +
    +    private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List<EndpointAffinity>
endpointAffinities) {
    +        new NonStrictExpectations() {
    +            {
    +                fragment.getRoot(); result = root;
    +            }
    +        };
    +
    +        final Wrapper fragmentWrapper = new Wrapper(fragment, 1);
    +        final Stats stats = fragmentWrapper.getStats();
    +        stats.setDistributionAffinity(DistributionAffinity.LOCAL);
    +        stats.addCost(cost);
    +        stats.addMinWidth(minWidth);
    +        stats.addMaxWidth(maxWidth);
    +        stats.addEndpointAffinities(endpointAffinities);
    +        return fragmentWrapper;
    +    }
    +
    +    private void checkEndpointAssignments(List<DrillbitEndpoint> assignedEndpoints,
    +                                          Map<DrillbitEndpoint, Integer> expectedAssignments)
throws Exception {
    +        Map<DrillbitEndpoint, Integer> endpointAssignments = new HashMap<>();
    +        // Count the number of fragments assigned to each endpoint.
    +        for (DrillbitEndpoint endpoint: assignedEndpoints) {
    +            if (endpointAssignments.containsKey(endpoint)) {
    +                endpointAssignments.put(endpoint, endpointAssignments.get(endpoint) +
1);
    +            } else {
    +                endpointAssignments.put(endpoint, 1);
    +            }
    +        }
    +
    +        // Verify number of fragments assigned to each endpoint against the expected
value.
    +        for (Map.Entry<DrillbitEndpoint, Integer> endpointAssignment : endpointAssignments.entrySet())
{
    +            assertEquals(expectedAssignments.get(endpointAssignment.getKey()).intValue(),
    +                         endpointAssignment.getValue().intValue());
    +        }
    +    }
    +
    +    @Test
    +    public void testEqualLocalWorkUnitsUnderNodeLimit() throws Exception {
    +        final Wrapper wrapper = newWrapper(200, 1, 80,  /* cost, minWidth, maxWidth */
    +            ImmutableList.of( /*  endpointAffinities. */
    +                /* For local affinity, we only care about numLocalWorkUnits, the last
column below */
    +                /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */
    +                new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
    +            ));
    +        INSTANCE.parallelizeFragment(wrapper, newParameters(1 /* sliceTarget */,
    +                                                            23 /* maxWidthPerNode */,
    +                                                            200 /* globalMaxWidth */),
    +                                                            ImmutableList.of(DEP1, DEP2,
DEP3, DEP4, DEP5));
    +        // Everyone should get assigned 16 because
    +        // The parallelization maxWidth (80) is below the globalMaxWidth(200) and
    +        // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23
    +        Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1,
16,
    +                                                                             DEP2, 16,
    +                                                                             DEP3, 16,
    +                                                                             DEP4, 16,
    +                                                                             DEP5, 16);
    +        // Expect the fragment parallelization to be 80 (16 * 5)
    +        assertEquals(80, wrapper.getWidth());
    +
    +        final List<DrillbitEndpoint> assignedEndpoints = wrapper.getAssignedEndpoints();
    +        assertEquals(80, assignedEndpoints.size());
    +        assertTrue(assignedEndpoints.contains(DEP1));
    +        assertTrue(assignedEndpoints.contains(DEP2));
    +        assertTrue(assignedEndpoints.contains(DEP3));
    +        assertTrue(assignedEndpoints.contains(DEP4));
    +        assertTrue(assignedEndpoints.contains(DEP5));
    +
    +        checkEndpointAssignments(assignedEndpoints, expectedAssignments);
    +    }
    +
    +    @Test
    +    public void testEqualLocalWorkUnitsAboveNodeLimit() throws Exception {
    +        final Wrapper wrapper = newWrapper(200, 1, 80,  /* cost, minWidth, maxWidth */
    +            ImmutableList.of( /*  endpointAffinities. */
    +                /* For local affinity, we only care about numLocalWorkUnits, the last
column below */
    +                /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */
    +                new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16),
    +                new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16)
    +            ));
    +        INSTANCE.parallelizeFragment(wrapper, newParameters(1 /* sliceTarget */,
    +                                                            8 /* maxWidthPerNode */,
    +                                                            200 /* globalMaxWidth */),
    +                                                            ImmutableList.of(DEP1, DEP2,
DEP3, DEP4, DEP5));
    +        // Everyone should get assigned 8 because
    +        // maxWidthPerNode is 8 and localWorkUnits of all nodes is above maxWidthPerNode.
    +        // Also, the parallelization maxWidth (80) is below the globalMaxWidth(200)
    +        Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1,
8,
    +                                                                             DEP2, 8,
    +                                                                             DEP3, 8,
    +                                                                             DEP4, 8,
    +                                                                             DEP5, 8);
    +        // Expect the fragment parallelization to be 80 (16 * 5)
    --- End diff --
    
    wrong comment. Should be 40


> Fragment planning causes Drillbits to read remote chunks when local copies are available
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-4706
>                 URL: https://issues.apache.org/jira/browse/DRILL-4706
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.6.0
>         Environment: CentOS, RHEL
>            Reporter: Kunal Khatua
>            Assignee: Sorabh Hamirwasia
>              Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single rowgroup and
fitting within one chunk) is available on a 10-node setup with replication=3 ; a pure data
scan query causes about 2% of the data to be read remotely. 
> Even with the creation of metadata cache, the planner is selecting a sub-optimal plan
of executing the SCAN fragments such that some of the data is served from a remote server.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message