drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4446) Improve current fragment parallelization module
Date Thu, 03 Mar 2016 02:42:18 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177038#comment-15177038
] 

ASF GitHub Bot commented on DRILL-4446:
---------------------------------------

Github user laurentgo commented on a diff in the pull request:

    https://github.com/apache/drill/pull/403#discussion_r54829311
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
---
    @@ -0,0 +1,119 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +/**
    + * Implementation of {@link FragmentParallelizer} where fragment requires running on
a given set of endpoints. Width
    + * per node is depended on the affinity to the endpoint and total width (calculated using
costs)
    + */
    +public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
    +  public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer();
    +
    +  private HardAffinityFragmentParallelizer() { /* singleton */}
    +
    +  @Override
    +  public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters
parameters,
    +      final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException
{
    +
    +    final Stats stats = fragmentWrapper.getStats();
    +    final ParallelizationInfo pInfo = stats.getParallelizationInfo();
    +
    +    // Go through the affinity map and extract the endpoints that have mandatory assignment
requirement
    +    final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap();
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet())
{
    +      if (entry.getValue().isAssignmentRequired()) {
    +        endpointPool.put(entry.getKey(), entry.getValue());
    +      }
    +    }
    +
    +    // Step 1: Find the width taking into various parameters
    +    // 1.1. Find the parallelization based on cost. Use max cost of all operators in
this fragment; this is consistent
    +    //      with the calculation that ExcessiveExchangeRemover uses.
    +    int width = (int) Math.ceil(stats.getMaxCost() / parameters.getParallelizationThreshold());
    +
    +    // 1.2. Make sure the width is at least the number of endpoints that require an assignment
    +    width = Math.max(endpointPool.size(), width);
    +
    +    // 1.3. Cap the parallelization width by fragment level width limit and system level
per query width limit
    +    width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
    +    checkAndThrow(endpointPool.size() <= width,
    +        "Number of mandatory endpoints that require an assignment is more than the allowed
fragment max width.");
    +
    +    width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
    +    checkAndThrow(endpointPool.size() <= width,
    +        "Number of mandatory endpoints that require an assignment is more than the allowed
global query width.");
    +
    +    // Step 2: Select the endpoints
    +    final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
    +
    +    // 2.1 First add each endpoint from the pool once so that the mandatory assignment
requirement is fulfilled.
    +    int totalAssigned;
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet())
{
    +      endpoints.put(entry.getKey(), 1);
    +    }
    +    totalAssigned = endpoints.size();
    --- End diff --
    
    maybe you can declare totalAssigned here?


> Improve current fragment parallelization module
> -----------------------------------------------
>
>                 Key: DRILL-4446
>                 URL: https://issues.apache.org/jira/browse/DRILL-4446
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.5.0
>            Reporter: Venki Korukanti
>            Assignee: Venki Korukanti
>             Fix For: 1.6.0
>
>
> Current fragment parallelizer {{SimpleParallelizer.java}} can’t handle correctly the
case where an operator has mandatory scheduling requirement for a set of DrillbitEndpoints
and affinity for each DrillbitEndpoint (i.e how much portion of the total tasks to be scheduled
on each DrillbitEndpoint). It assumes that scheduling requirements are soft (except one case
where Mux and DeMux case where mandatory parallelization requirement of 1 unit). 
> An example is:
> Cluster has 3 nodes running Drillbits and storage service on each. Data for a table is
only present at storage services in two nodes. So a GroupScan needs to be scheduled on these
two nodes in order to read the data. Storage service doesn't support (or costly) reading data
from remote node.
> Inserting the mandatory scheduling requirements within existing SimpleParallelizer is
not sufficient as you may end up with a plan that has a fragment with two GroupScans each
having its own hard parallelization requirements.
> Proposal is:
> Add a property to each operator which tells what parallelization implementation to use.
Most operators don't have any particular strategy (such as Project or Filter), they depend
on incoming operator. Current existing operators which have requirements (all existing GroupScans)
default to current parallelizer {{SimpleParallelizer}}. {{Screen}} defaults to new mandatory
assignment parallelizer. It is possible that PhysicalPlan generated can have a fragment with
operators having different parallelization strategies. In that case an exchange is inserted
in between operators where a change in parallelization strategy is required.
> Will send a detailed design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message