drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Padma Penumarthy (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (DRILL-4706) Fragment planning causes Drillbits to read remote chunks when local copies are available
Date Mon, 31 Oct 2016 22:54:59 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15623642#comment-15623642
] 

Padma Penumarthy edited comment on DRILL-4706 at 10/31/16 10:54 PM:
--------------------------------------------------------------------

Notes about how current algorithm(SoftAffinity based) works, why remote reads happen and the
new algorithm (LocalAffinity based) implemented.

SoftAffinity (current algorithm for scheduling parquet scan fragments):

Initialization (getPlan -> GetGroupScan -> ParquetGroupScan.init):
1.When parquet metadata is read, for each rowGroup,
HostAffinity for each host (ratio of number of bytes present on that host / total bytes for
the rowGroup) is calculated.
2. EndPointAffinity for each host (ratio of number of bytes on the host/total bytes  for
the whole scan) is calculated.

Parallelize the scan (SoftAffinityFragmentParallelizer.parallelizeFragment):
1. Compute how many total fragments to schedule (width)  (Based on cost, slice target, min
and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Divide by number of nodes  - This is the average number of fragments we want to run on
each node.
3. To favor nodes with affinity > 0 i.e nodes that have some local data (does not matter
what the value is), 
    multiply the value from 2 above by affinity factor  - This is the number of fragments
we want to schedule on each node with affinity.
4. Schedule upto  number of fragments calculated from 3 above on each of the nodes with affinity
in round robin fashion.
5. If we schedule the required number of fragments (i.e. width from 1 above), we are done.
6. Else, rest of fragments, we schedule on nodes which do not have any local data i.e. nodes
with no affinity in a round robin fashion.

Assignment(AssignmentCreator.getMappings):
1. To distribute rowGroups uniformly, calculate maxWork each fragment should do (total number
of rowGroups/total number of fragments)
2.  For each endpoint, calculate the maxCount (maxWork * number of fragments on the endpoint)
and minCount (at least 1 per fragment or (maxWork-1) * number of fragments) number of rowGroups
to assign. 
3. Assign up to minCount rowGroups per endpoint in a round robin fashion, selecting from the
sorted list of hosts(sorted based on host affinity) for each rowGroup.
4. If there are any leftovers, assign to the endPoints which do not have minimum (i.e. minCount)
assigned yet.
5. If there are still leftovers, assign to the endPoints which do not maximum (i.e. maxCount)
assigned yet.

Why is this causing remote reads and why increasing affinity factor does not help ?
When the data is skewed i.e. data is not distributed equally, all nodes with affinity still
get equal number of fragments assigned (because they have some data). 
We are not assigning fragments proportional to affinity value i.e. amount of data available
on the node.
So, some of them have to do remote read since data is not available locally. Since they all
are treated equally,
increasing affinity factor does not help. affinity factor only helps in eliminating nodes
which do not
have any data vs. nodes which have some data.

Another problem  is calculation of endpoint affinity values. We do not take replication factor
into account and end up including bytes for a rowGroup multiple times on different hosts.
Based on data distribution, this results in skewed affinity values which do not reflect how
those values are being/should be used. 


LocalAffinity (new algorithm based on locality of data):
This is not enabled by default. To use the new algorithm, we need to set system option `parquet.use_local_affinity`=true.
Every effort is made to have the new code under the new option so no regressions are introduced.

This will invoke a new local affinity fragment parallelizer which is less restrictive than
soft affinity fragment parallelizer and is enabled only for parquet group scan.

Initialization(getPlan -> GetGroupScan -> ParquetGroupScan.init):
1. When parquet metadata is read, for each rowGroup, we need to compute the best possible
host to scan it on (computeRowGroupAssignment)
2. For each rowGroup, get the lists of hosts which have maximum data available locally for
the rowGroup (topEndpoints).
3. From that list, pick the node which has minimum amount of work assigned so far (based on
number of bytes assigned to scan on that node).
4. Repeat 2 and 3  for second pass so we make adjustments after one round of allocations are
done i.e. after first iteration.
5. Once we compute the best possible node on which to scan the rowGroup, save that information
(preferredEndpoint). Note: preferredEndpoint will be null if there is no drillbit running
on any of the nodes which have data or if it is local file system. 
6. Update endpointAffinity for each node with the number of rowGroups (localWorkUnits) assigned
to be scanned on that endpoint.

Parallelize the Scan(LocalAffinityFragmentParallelizer.parallelizeFragment):
1. Decide how many total fragments to run (width) (Based on cost, slice target, min and maxWidth
for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Include each endpoint which has affinity with localWorkUnits > 0 in the list of endpoints
on which we want to schedule the fragments (endpointPool).
3. Assign one fragment to each of the nodes from the above endpointPool to make sure minimum
of one is assigned to each of them.
4. Calculate how many fragments to assign to each of the nodes in endpointPool based on how
much work they have to do i.e. targetAllocation (proportional to localWorkUnits assigned to
the node).
5. Go through the endpointPool in a round robin way and keep assigning fragments to individual
nodes till their target allocation or maxWidthPerNodes is reached.
6. Stop when overall allocation reaches the total target i.e. x above. 
7. It is possible that some rowGroups  have preferred endPoints null (because there is no
drill bit running on the hosts which have data for the rowGroup). In that case, we will have
unassigned work Items.
8. Allocate the fragments for unassigned work items to active end points, making sure maxWidthPerNode
constraint is honored.

Assignment(AssignmentCreator.getMappings):
1. When the system option parquet.use_local_affinity is set to true, assign each rowGroup
to a fragment (round robin) on it’s preferredEndPoint (assignLocal).
2. If the preferredEndpoint is null or fragment is not available on that node, add it to unassignedList
3. Fallback to current algorithm to assign unassigned list of rowGroups from 2.




was (Author: ppenumarthy):
Notes about how current algorithm(SoftAffinity based) works, why remote reads happen and the
new algorithm (LocalAffinity based) implemented.

SoftAffinity (current algorithm for scheduling parquet scan fragments):

Initialization (getPlan -> GetGroupScan -> ParquetGroupScan.init):
1.When parquet metadata is read, for each rowGroup,
HostAffinity for each host (ratio of number of bytes present on that host / total bytes for
the rowGroup) is calculated.
2. EndPointAffinity for each host (ratio of number of bytes on the host/total bytes  for
the whole scan) is calculated.

Parallelize the scan (SoftAffinityFragmentParallelizer.parallelizeFragment):
1. Compute how many total fragments to schedule (width)  (Based on cost, slice target, min
and maxWidth for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Divide by number of nodes  - This is the average number of fragments we want to run on
each node.
3. To favor nodes with affinity > 0 i.e nodes that have some local data (does not matter
what the value is), 
    multiply the value from 2 above by affinity factor  - This is the number of fragments
we want to schedule on each node with affinity.
4. Schedule upto  number of fragments calculated from 3 above on each of the nodes with affinity
in round robin fashion.
5. If we schedule the required number of fragments (i.e. width from 1 above), we are done.
6. Else, rest of fragments, we schedule on nodes which do not have any local data i.e. nodes
with no affinity in a round robin fashion.

Assignment(AssignmentCreator.getMappings):
1. To distribute rowGroups uniformly, calculate maxWork each fragment should do (total number
of rowGroups/total number of fragments)
2.  For each endpoint, calculate the maxCount (maxWork * number of fragments on the endpoint)
and minCount (at least 1 per fragment or (maxWork-1) * number of fragments) number of rowGroups
to assign. 
3. Assign up to minCount rowGroups per endpoint in a round robin fashion, selecting from the
sorted list of hosts(sorted based on host affinity) for each rowGroup.
4. If there are any leftovers, assign to the endPoints which do not have minimum (i.e. minCount)
assigned yet.
5. If there are still leftovers, assign to the endPoints which do not maximum (i.e. maxCount)
assigned yet.

Why is this causing remote reads and why increasing affinity factor does not help ?
When the data is skewed i.e. data is not distributed equally, all nodes with affinity still
get equal number of fragments assigned (because they have some data). 
We are not assigning fragments proportional to affinity value i.e. amount of data available
on the node.
So, some of them have to do remote read since data is not available locally. Since they all
are treated equally,
increasing affinity factor does not help. affinity factor only helps in eliminating nodes
which do not
have any data vs. nodes which have some data.

Another problem  is calculation of endpoint affinity values. We do not take replication factor
into account and end up
including bytes for a rowGroup multiple times on different hosts. Based on data distribution,
this results in skewed affinity values which do not
reflect how those values are being/should be used. 


LocalAffinity (new algorithm based on locality of data):
This is not enabled by default. To use the new algorithm, we need to set system option `parquet.use_local_affinity`=true.
Every effort is made to have the new code under the new option so no regressions are introduced.

This will invoke a new local affinity fragment parallelizer which is less restrictive than
soft affinity fragment parallelizer
and is enabled only for parquet group scan.

Initialization(getPlan -> GetGroupScan -> ParquetGroupScan.init):
1. When parquet metadata is read, for each rowGroup, we need to compute the best possible
host to scan it on (computeRowGroupAssignment)
2. For each rowGroup, get the lists of hosts which have maximum data available locally for
the rowGroup (topEndpoints).
3. From that list, pick the node which has minimum amount of work assigned so far (based on
number of bytes assigned to scan on that node).
4. Repeat 2 and 3  for second pass so we make adjustments after one round of allocations are
done i.e. after first iteration.
5. Once we compute the best possible node on which to scan the rowGroup, save that information
(preferredEndpoint). Note: preferredEndpoint will be null if there is no drillbit running
on any of the nodes which have data or if it is local file system. 
6. Update endpointAffinity for each node with the number of rowGroups (localWorkUnits) assigned
to be scanned on that endpoint.

Parallelize the Scan(LocalAffinityFragmentParallelizer.parallelizeFragment):
1. Decide how many total fragments to run (width) (Based on cost, slice target, min and maxWidth
for the operator, maxGlobalWidth, maxWidthPerNode etc.)
2. Include each endpoint which has affinity with localWorkUnits > 0 in the list of endpoints
on which we want to schedule the fragments (endpointPool).
3. Assign one fragment to each of the nodes from the above endpointPool to make sure minimum
of one is assigned to each of them.
4. Calculate how many fragments to assign to each of the nodes in endpointPool based on how
much work they have to do i.e. targetAllocation (proportional to localWorkUnits assigned to
the node).
5. Go through the endpointPool in a round robin way and keep assigning fragments to individual
nodes till their target allocation or maxWidthPerNodes is reached.
6. Stop when overall allocation reaches the total target i.e. x above. 
7. It is possible that some rowGroups  have preferred endPoints null (because there is no
drill bit running on the hosts which have data for the rowGroup). 
    In that case, we will have unassigned work Items.
8. Allocate the fragments for unassigned work items to active end points, making sure maxWidthPerNode
constraint is honored.

Assignment(AssignmentCreator.getMappings):
1. When the system option parquet.use_local_affinity is set to true, assign each rowGroup
to a fragment (round robin) on it’s preferredEndPoint (assignLocal).
2. If the preferredEndpoint is null or fragment is not available on that node, add it to unassignedList
3. Fallback to current algorithm to assign unassigned list of rowGroups from 2.



> Fragment planning causes Drillbits to read remote chunks when local copies are available
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-4706
>                 URL: https://issues.apache.org/jira/browse/DRILL-4706
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.6.0
>         Environment: CentOS, RHEL
>            Reporter: Kunal Khatua
>            Assignee: Sorabh Hamirwasia
>              Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single rowgroup and
fitting within one chunk) is available on a 10-node setup with replication=3 ; a pure data
scan query causes about 2% of the data to be read remotely. 
> Even with the creation of metadata cache, the planner is selecting a sub-optimal plan
of executing the SCAN fragments such that some of the data is served from a remote server.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message