drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Padma Penumarthy (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (DRILL-6238) Batch sizing for operators
Date Wed, 14 Mar 2018 00:43:00 GMT

     [ https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Padma Penumarthy updated DRILL-6238:
------------------------------------
    Description: 
*Batch Sizing For Operators*

This document describes the approach we are taking for limiting batch sizes for operators
other than scan.

*Motivation*

Main goals are
 # Improve concurrency
 # Reduce query failures because of out of memory errors

To accomplish these goals, we need to make queries execute within a specified memory budget.
To enforce per query memory limit, we need to be able to enforce per fragment and per operator
memory limits. Controlling individual operators batch sizes is the first step towards all
this.

*Background*

In Drill, different operators have different limits w.r.to outgoing batches. Some use hard
coded row counts, some use hard coded memory and some have none at all. Based on input data
size and what the operator is doing, memory used by the outgoing batch can vary widely as
there are no limits imposed. Queries fail because we are not able to allocate the memory needed.
Some operators produce very large batches, causing blocking operators like sort, hash agg
which have to work under tight memory constraints to fail. Size of batches should be a function
of available memory rather than input data size and/or what the operator does. Please refer
to table at the end of this document for details on what each operator does today.

*Design*

Goal is to have all operators behave the same way i.e. produce batches with size less than
or equal to configured outgoing batch size with a minimum of 1 row per batch and maximum of
64k rows per batch. A new system option ‘drill.exec.memory.operator.output_batch_size’
is added which has default value of 16MB.

The basic idea is to limit size of outgoing batch by deciding how many rows we can have in
the batch based on average entry size of each outgoing column, taking into account actual
data size and metadata vector overhead we add on top for tracking variable length, mode(repeated,
optional, required) etc. This calculation will be different for each operator and is based
on
 # What the operator is doing
 # Incoming batch size that includes information on type and average size of each column
 # What is being projected out

By taking this adaptive approach based on actual average data sizes, for operators which were
limiting batch size to less than 64K rows before can possibly do lot more rows (upto 64K rows)
in a batch if the memory stays within the budget. For example, flatten and joins have batch
size of 4K rows, which probably might have been done to be conservative w.r.to memory usage.
By making these operators go upto 64K as long as they stay with in the memory budget should
help improve performance.

Also, to improve performance and utilize memory more efficiently, we will
 # Allocate memory for value vectors upfront. Since we know the number of rows and sizing
information for each column in the  outgoing batch, we will use that information to allocate
memory for value vectors upfront.  Currently, we either do initial allocation for 4K values
and keep doubling every time we need more or allocate for maximum needed upfront. With this
change to pre allocate memory based on sizing calculation, we can improve performance by reducing
the memory copies and zeroing the new half we do every time we double and help save memory
in cases where we were over allocating before.
 # Make the number of rows in outgoing batch a power of two. Since memory is allocated in
powers of two, this will help us pack the value vectors densely thereby reducing the amount
of memory that gets wasted because of doubling effect.

So, to summarize, the benefits we will get are improved memory utilization, better performance,
higher concurrency and less queries dying because of out of memory errors.

One thing to note:

Since these sizing calculations are based on averages, strict memory usage enforcement is
not possible. There could be pathological cases where because of uneven data distribution,
we might exceed the configured output batch size potentially causing OOM errors and problems
in downstream operators.

Other issues that will be addressed:
 * We are adding extra processing for each batch in each operator to figure out the sizing
information. This overhead can be reduced by passing this information along with the batch
between operators.
 * For some operators, it will be complex to figure out average size of outgoing columns especially
if we have to evaluate complex expression trees and UDFs to figure out the transformation
on incoming batches. We will use approximations as appropriate.

Following table summarizes the limits we have today for each operator.

flatten, merge join and external sort have already been changed to adhere to batch size limits
as described in this document as of drill release 1.13.

 
|*Operator*|*Limit* 
 *(Rows, Memory)*|*Notes*|
|Flatten|4K, 512MB|Flatten can produce very large batches based on average cardinality of
the flatten column.|
|Merge Receiver|32K|No memory limit.|
|Hash Aggregate|64K|No memory limit.|
|Streaming Aggregate|32K|No memory limit.|
|Broadcast Sender|None|No limits.|
|Filter, Limit|None|No limits.|
|Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might have been put in
place to be conservative w.r.to memory usage.|
|Merge Join|4K|No memory limit|
|Nested Loop Join|4K|No memory limit|
|Partition Sender|1K| |
|Project|64K|No memory limit|
|Selection Vector Remover|None|No limits.|
|TopN|4K|No memory limit|
|Union|None|No limit|
|Windows|None|No limit|
|External Sort|64k, 16 MB| |
|Unordered Receiver|None|No limit|

  was:
*Batch Sizing For Operators*

This document describes the approach we are taking for limiting batch sizes for operators
other than scan.

*Motivation*

Main goals are
 # Improve concurrency
 # Reduce query failures because of out of memory errors

To accomplish these goals, we need to make queries execute within a specified memory budget.
To enforce per query memory limit, we need to be able to enforce per fragment and per operator
memory limits. Controlling individual operators batch sizes is the first step towards all
this.

*Background*

In Drill, different operators have different limits w.r.to outgoing batches. Some use hard
coded row counts, some use hard coded memory and some have none at all. Based on input data
size and what the operator is doing, memory used by the outgoing batch can vary widely as
there are no limits imposed. Queries fail because we are not able to allocate the memory needed.
Some operators produce very large batches, causing blocking operators like sort, hash agg
which have to work under tight memory constraints to fail. Size of batches should be a function
of available memory rather than input data size and/or what the operator does. Please refer
to table at the end of this document for details on what each operator does today.

*Design*

Goal is to have all operators behave the same way i.e. produce batches with size less than
or equal to configured outgoing batch size with a minimum of 1 row per batch and maximum of
64k rows per batch. A new system option ‘drill.exec.memory.operator.output_batch_size’
is added which has default value of 16MB.

The basic idea is to limit size of outgoing batch by deciding how many rows we can have in
the batch based on average entry size of each outgoing column, taking into account actual
data size and metadata vector overhead we add on top for tracking variable length, mode(repeated,
optional, required) etc. This calculation will be different for each operator, based on what
the operator is doing, incoming batch size that includes information on type and average
size of each column and what is being projected out.

By taking this adaptive approach based on actual average data sizes, for operators which were
limiting batch size to something less than 64K before can possibly do lot more rows (upto
64K) in a batch if the memory stays within the budget. This should help improve performance.

Also, to improve performance and utilize memory more efficiently, we will
 # Allocate memory for value vectors upfront. Since we know the number of rows and sizing
information for each column in the  outgoing batch, we will use that information to allocate
memory for value vectors upfront.  Currently, we either do initial allocation for 4K values
and keep doubling every time we need more or allocate for maximum needed upfront. With this
change to pre allocate memory based on sizing calculation, we can improve performance by reducing
the memory copies and zeroing the new half we do every time we double and help save memory
in cases where we were over allocating before.
 # Make the number of rows in outgoing batch a power of two. Since memory is allocated in
powers of two, this will help us pack the value vectors densely thereby reducing the amount
of memory that gets wasted because of doubling effect.

So, to summarize, the benefits we will get are improved memory utilization, better performance,
higher concurrency and less queries dying because of out of memory errors.

One thing to note:

Since these sizing calculations are based on averages, strict memory usage enforcement is
not possible. There could be pathological cases where because of uneven data distribution,
we might exceed the configured output batch size potentially causing OOM errors and problems
in downstream operators.

Other issues that will be addressed:
 * We are adding extra processing for each batch in each operator to figure out the sizing
information. This overhead can be reduced by passing this information along with the batch
between operators.
 * For some operators, it will be complex to figure out average size of outgoing columns especially
if we have to evaluate complex expression trees and UDFs to figure out the transformation
on incoming batches. We will use approximations as appropriate.

Following table summarizes the limits we have today for each operator.

flatten, merge join and external sort have already been changed to adhere to batch size limits
as described in this document as of drill release 1.13.

 
|*Operator*|*Limit* 
 *(Rows, Memory)*|*Notes*|
|Flatten|4K, 512MB|Flatten can produce very large batches based on average cardinality of
the flatten column.|
|Merge Receiver|32K|No memory limit.|
|Hash Aggregate|64K|No memory limit.|
|Streaming Aggregate|32K|No memory limit.|
|Broadcast Sender|None|No limits.|
|Filter, Limit|None|No limits.|
|Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might have been put in
place to be conservative w.r.to memory usage.|
|Merge Join|4K|No memory limit|
|Nested Loop Join|4K|No memory limit|
|Partition Sender|1K| |
|Project|64K|No memory limit|
|Selection Vector Remover|None|No limits.|
|TopN|4K|No memory limit|
|Union|None|No limit|
|Windows|None|No limit|
|External Sort|64k, 16 MB| |
|Unordered Receiver|None|No limit|


> Batch sizing for operators
> --------------------------
>
>                 Key: DRILL-6238
>                 URL: https://issues.apache.org/jira/browse/DRILL-6238
>             Project: Apache Drill
>          Issue Type: New Feature
>            Reporter: Padma Penumarthy
>            Assignee: Padma Penumarthy
>            Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes for operators
other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified memory
budget. To enforce per query memory limit, we need to be able to enforce per fragment and
per operator memory limits. Controlling individual operators batch sizes is the first step
towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. Some use
hard coded row counts, some use hard coded memory and some have none at all. Based on input
data size and what the operator is doing, memory used by the outgoing batch can vary widely
as there are no limits imposed. Queries fail because we are not able to allocate the memory
needed. Some operators produce very large batches, causing blocking operators like sort, hash
agg which have to work under tight memory constraints to fail. Size of batches should be a
function of available memory rather than input data size and/or what the operator does. Please
refer to table at the end of this document for details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with size less
than or equal to configured outgoing batch size with a minimum of 1 row per batch and maximum
of 64k rows per batch. A new system option ‘drill.exec.memory.operator.output_batch_size’
is added which has default value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows we can have
in the batch based on average entry size of each outgoing column, taking into account actual
data size and metadata vector overhead we add on top for tracking variable length, mode(repeated,
optional, required) etc. This calculation will be different for each operator and is based
on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for operators which
were limiting batch size to less than 64K rows before can possibly do lot more rows (upto
64K rows) in a batch if the memory stays within the budget. For example, flatten and joins
have batch size of 4K rows, which probably might have been done to be conservative w.r.to
memory usage. By making these operators go upto 64K as long as they stay with in the memory
budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of rows and sizing
information for each column in the  outgoing batch, we will use that information to allocate
memory for value vectors upfront.  Currently, we either do initial allocation for 4K values
and keep doubling every time we need more or allocate for maximum needed upfront. With this
change to pre allocate memory based on sizing calculation, we can improve performance by reducing
the memory copies and zeroing the new half we do every time we double and help save memory
in cases where we were over allocating before.
>  # Make the number of rows in outgoing batch a power of two. Since memory is allocated
in powers of two, this will help us pack the value vectors densely thereby reducing the amount
of memory that gets wasted because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, better performance,
higher concurrency and less queries dying because of out of memory errors.
> One thing to note:
> Since these sizing calculations are based on averages, strict memory usage enforcement
is not possible. There could be pathological cases where because of uneven data distribution,
we might exceed the configured output batch size potentially causing OOM errors and problems
in downstream operators.
> Other issues that will be addressed:
>  * We are adding extra processing for each batch in each operator to figure out the sizing
information. This overhead can be reduced by passing this information along with the batch
between operators.
>  * For some operators, it will be complex to figure out average size of outgoing columns
especially if we have to evaluate complex expression trees and UDFs to figure out the transformation
on incoming batches. We will use approximations as appropriate.
> Following table summarizes the limits we have today for each operator.
> flatten, merge join and external sort have already been changed to adhere to batch size
limits as described in this document as of drill release 1.13.
>  
> |*Operator*|*Limit* 
>  *(Rows, Memory)*|*Notes*|
> |Flatten|4K, 512MB|Flatten can produce very large batches based on average cardinality
of the flatten column.|
> |Merge Receiver|32K|No memory limit.|
> |Hash Aggregate|64K|No memory limit.|
> |Streaming Aggregate|32K|No memory limit.|
> |Broadcast Sender|None|No limits.|
> |Filter, Limit|None|No limits.|
> |Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might have been
put in place to be conservative w.r.to memory usage.|
> |Merge Join|4K|No memory limit|
> |Nested Loop Join|4K|No memory limit|
> |Partition Sender|1K| |
> |Project|64K|No memory limit|
> |Selection Vector Remover|None|No limits.|
> |TopN|4K|No memory limit|
> |Union|None|No limit|
> |Windows|None|No limit|
> |External Sort|64k, 16 MB| |
> |Unordered Receiver|None|No limit|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message