From issues-return-50772-archive-asf-public=cust-asf.ponee.io@drill.apache.org Wed Mar 14 01:47:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8581118064F for ; Wed, 14 Mar 2018 01:47:06 +0100 (CET) Received: (qmail 26811 invoked by uid 500); 14 Mar 2018 00:47:05 -0000 Mailing-List: contact issues-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@drill.apache.org Delivered-To: mailing list issues@drill.apache.org Received: (qmail 26802 invoked by uid 99); 14 Mar 2018 00:47:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Mar 2018 00:47:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 115641A057D for ; Wed, 14 Mar 2018 00:47:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -102.311 X-Spam-Level: X-Spam-Status: No, score=-102.311 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id y5UlCs-AkqKA for ; Wed, 14 Mar 2018 00:47:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 4F9555F126 for ; Wed, 14 Mar 2018 00:47:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 9DB99E030E for ; Wed, 14 Mar 2018 00:47:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2E5992148D for ; Wed, 14 Mar 2018 00:47:00 +0000 (UTC) Date: Wed, 14 Mar 2018 00:47:00 +0000 (UTC) From: "Padma Penumarthy (JIRA)" To: issues@drill.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (DRILL-6238) Batch sizing for operators MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/DRILL-6238?page=3Dcom.atlassia= n.jira.plugin.system.issuetabpanels:all-tabpanel ] Padma Penumarthy updated DRILL-6238: ------------------------------------ Description:=20 *Batch Sizing For Operators* This document describes the approach we are taking for limiting batch sizes= for operators other than scan. *Motivation* Main goals are # Improve concurrency # Reduce query failures because of out of memory errors To accomplish these goals, we need to make queries execute within a specifi= ed memory budget. To enforce per query memory limit, we need to be able to = enforce per fragment and per operator memory limits. Controlling individual= operators batch sizes is the first step towards all this. *Background* In Drill, different operators have different limits w.r.to outgoing batches= . Some use hard coded row counts, some use hard coded memory and some have = none at all. Based on input data size and what the operator is doing, memor= y used by the outgoing batch can vary widely as there are no limits imposed= . Queries fail because we are not able to allocate the memory needed. Some = operators produce very large batches, causing blocking operators like sort,= hash agg which have to work under tight memory constraints to fail. Size o= f batches should be a function of available memory rather than input data s= ize and/or what the operator does. Please refer to table at the end of this= document for details on what each operator does today. *Design* Goal is to have all operators behave the same way i.e. produce batches with= size less than or equal to configured outgoing batch size with a minimum o= f 1 row per batch and maximum of 64k rows per batch. A new system option = =E2=80=98drill.exec.memory.operator.output_batch_size=E2=80=99 is added whi= ch has default value of 16MB. The basic idea is to limit size of outgoing batch by deciding how many rows= we can have in the batch based on average entry size of each outgoing colu= mn, taking into account actual data size and metadata vector overhead we ad= d on top for tracking variable length, mode(repeated, optional, required) e= tc. This calculation will be different for each operator and is based on # What the operator is doing # Incoming batch size that includes=C2=A0information on type and average s= ize of each column # What is being projected out By taking this adaptive approach based on actual average data sizes, for op= erators which were limiting batch size to less than 64K rows before can pos= sibly do lot more rows (upto 64K rows) in a batch if the memory stays withi= n the budget. For example, flatten and joins have batch size of 4K rows, wh= ich probably might have been done to be conservative w.r.to memory usage. B= y making these operators go upto 64K as long as they stay with in the memor= y budget=C2=A0should help improve performance. Also, to improve performance and utilize memory more efficiently, we will # Allocate memory for value vectors upfront. Since we know the number of r= ows and sizing information for each column in the =C2=A0outgoing batch, we = will use that information to allocate memory for value vectors upfront.=C2= =A0 Currently, we either do initial allocation for 4K values and keep doubl= ing every time we need more or allocate for maximum needed upfront. With th= is change to pre allocate memory based on sizing calculation, we can improv= e performance by reducing the memory copies and zeroing the new half we do = every time we double and help save memory in cases where we were over alloc= ating before. # Round down the number of rows in outgoing batch to a power of two. Since= memory is allocated in powers of two, this will help us pack the value vec= tors densely thereby reducing the amount of memory that gets wasted because= of doubling effect. So, to summarize, the benefits we will get are improved memory utilization,= better performance, higher concurrency and less queries dying because of o= ut of memory errors. Note: Since these sizing calculations are based on averages, strict memory = usage enforcement is not possible. There could be pathological cases where = because of uneven data distribution, we might exceed the configured output = batch size potentially causing OOM errors and problems in downstream operat= ors. Other issues that will be addressed: * We are adding extra processing for each batch in each operator to figure= out the sizing information. This overhead can be reduced by passing this i= nformation along with the batch between operators. * For some operators, it will be complex to figure out average size of out= going columns especially if we have to evaluate complex expression trees an= d UDFs to figure out the transformation on incoming batches. We will use ap= proximations as appropriate. Following table summarizes the limits we have today for each operator. flatten, merge join and external sort have already been changed to adhere t= o batch size limits as described in this document as of drill release 1.13. =C2=A0 |*Operator*|*Limit*=20 *(Rows, Memory)*|*Notes*| |Flatten|4K, 512MB|Flatten can produce very large batches based on average = cardinality of the flatten column.| |Merge Receiver|32K|No memory limit.| |Hash Aggregate|64K|No memory limit.| |Streaming Aggregate|32K|No memory limit.| |Broadcast Sender|None|No limits.| |Filter, Limit|None|No limits.| |Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might = have been put in place to be conservative w.r.to memory usage.| |Merge Join|4K|No memory limit| |Nested Loop Join|4K|No memory limit| |Partition Sender|1K|=C2=A0| |Project|64K|No memory limit| |Selection Vector Remover|None|No limits.| |TopN|4K|No memory limit| |Union|None|No limit| |Windows|None|No limit| |External Sort|64k, 16 MB|=C2=A0| |Unordered Receiver|None|No limit| was: *Batch Sizing For Operators* This document describes the approach we are taking for limiting batch sizes= for operators other than scan. *Motivation* Main goals are # Improve concurrency # Reduce query failures because of out of memory errors To accomplish these goals, we need to make queries execute within a specifi= ed memory budget. To enforce per query memory limit, we need to be able to = enforce per fragment and per operator memory limits. Controlling individual= operators batch sizes is the first step towards all this. *Background* In Drill, different operators have different limits w.r.to outgoing batches= . Some use hard coded row counts, some use hard coded memory and some have = none at all. Based on input data size and what the operator is doing, memor= y used by the outgoing batch can vary widely as there are no limits imposed= . Queries fail because we are not able to allocate the memory needed. Some = operators produce very large batches, causing blocking operators like sort,= hash agg which have to work under tight memory constraints to fail. Size o= f batches should be a function of available memory rather than input data s= ize and/or what the operator does. Please refer to table at the end of this= document for details on what each operator does today. *Design* Goal is to have all operators behave the same way i.e. produce batches with= size less than or equal to configured outgoing batch size with a minimum o= f 1 row per batch and maximum of 64k rows per batch. A new system option = =E2=80=98drill.exec.memory.operator.output_batch_size=E2=80=99 is added whi= ch has default value of 16MB. The basic idea is to limit size of outgoing batch by deciding how many rows= we can have in the batch based on average entry size of each outgoing colu= mn, taking into account actual data size and metadata vector overhead we ad= d on top for tracking variable length, mode(repeated, optional, required) e= tc. This calculation will be different for each operator and is based on # What the operator is doing # Incoming batch size that includes=C2=A0information on type and average s= ize of each column # What is being projected out By taking this adaptive approach based on actual average data sizes, for op= erators which were limiting batch size to less than 64K rows before can pos= sibly do lot more rows (upto 64K rows) in a batch if the memory stays withi= n the budget. For example, flatten and joins have batch size of 4K rows, wh= ich probably might have been done to be conservative w.r.to memory usage. B= y making these operators go upto 64K as long as they stay with in the memor= y budget=C2=A0should help improve performance. Also, to improve performance and utilize memory more efficiently, we will # Allocate memory for value vectors upfront. Since we know the number of r= ows and sizing information for each column in the =C2=A0outgoing batch, we = will use that information to allocate memory for value vectors upfront.=C2= =A0 Currently, we either do initial allocation for 4K values and keep doubl= ing every time we need more or allocate for maximum needed upfront. With th= is change to pre allocate memory based on sizing calculation, we can improv= e performance by reducing the memory copies and zeroing the new half we do = every time we double and help save memory in cases where we were over alloc= ating before. # Round down the number of rows in outgoing batch to a power of two. Since= memory is allocated in powers of two, this will help us pack the value vec= tors densely thereby reducing the amount of memory that gets wasted because= of doubling effect. So, to summarize, the benefits we will get are improved memory utilization,= better performance, higher concurrency and less queries dying because of o= ut of memory errors. One thing to note: Since these sizing calculations are based on averages, strict memory usage = enforcement is not possible. There could be pathological cases where becaus= e of uneven data distribution, we might exceed the configured output batch = size potentially causing OOM errors and problems in downstream operators. Other issues that will be addressed: * We are adding extra processing for each batch in each operator to figure= out the sizing information. This overhead can be reduced by passing this i= nformation along with the batch between operators. * For some operators, it will be complex to figure out average size of out= going columns especially if we have to evaluate complex expression trees an= d UDFs to figure out the transformation on incoming batches. We will use ap= proximations as appropriate. Following table summarizes the limits we have today for each operator. flatten, merge join and external sort have already been changed to adhere t= o batch size limits as described in this document as of drill release 1.13. =C2=A0 |*Operator*|*Limit*=20 *(Rows, Memory)*|*Notes*| |Flatten|4K, 512MB|Flatten can produce very large batches based on average = cardinality of the flatten column.| |Merge Receiver|32K|No memory limit.| |Hash Aggregate|64K|No memory limit.| |Streaming Aggregate|32K|No memory limit.| |Broadcast Sender|None|No limits.| |Filter, Limit|None|No limits.| |Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might = have been put in place to be conservative w.r.to memory usage.| |Merge Join|4K|No memory limit| |Nested Loop Join|4K|No memory limit| |Partition Sender|1K|=C2=A0| |Project|64K|No memory limit| |Selection Vector Remover|None|No limits.| |TopN|4K|No memory limit| |Union|None|No limit| |Windows|None|No limit| |External Sort|64k, 16 MB|=C2=A0| |Unordered Receiver|None|No limit| > Batch sizing for operators > -------------------------- > > Key: DRILL-6238 > URL: https://issues.apache.org/jira/browse/DRILL-6238 > Project: Apache Drill > Issue Type: New Feature > Reporter: Padma Penumarthy > Assignee: Padma Penumarthy > Priority: Major > > *Batch Sizing For Operators* > This document describes the approach we are taking for limiting batch siz= es for operators other than scan. > *Motivation* > Main goals are > # Improve concurrency > # Reduce query failures because of out of memory errors > To accomplish these goals, we need to make queries execute within a speci= fied memory budget. To enforce per query memory limit, we need to be able t= o enforce per fragment and per operator memory limits. Controlling individu= al operators batch sizes is the first step towards all this. > *Background* > In Drill, different operators have different limits w.r.to outgoing batch= es. Some use hard coded row counts, some use hard coded memory and some hav= e none at all. Based on input data size and what the operator is doing, mem= ory used by the outgoing batch can vary widely as there are no limits impos= ed. Queries fail because we are not able to allocate the memory needed. Som= e operators produce very large batches, causing blocking operators like sor= t, hash agg which have to work under tight memory constraints to fail. Size= of batches should be a function of available memory rather than input data= size and/or what the operator does. Please refer to table at the end of th= is document for details on what each operator does today. > *Design* > Goal is to have all operators behave the same way i.e. produce batches wi= th size less than or equal to configured outgoing batch size with a minimum= of 1 row per batch and maximum of 64k rows per batch. A new system option = =E2=80=98drill.exec.memory.operator.output_batch_size=E2=80=99 is added whi= ch has default value of 16MB. > The basic idea is to limit size of outgoing batch by deciding how many ro= ws we can have in the batch based on average entry size of each outgoing co= lumn, taking into account actual data size and metadata vector overhead we = add on top for tracking variable length, mode(repeated, optional, required)= etc. This calculation will be different for each operator and is based on > # What the operator is doing > # Incoming batch size that includes=C2=A0information on type and average= size of each column > # What is being projected out > By taking this adaptive approach based on actual average data sizes, for = operators which were limiting batch size to less than 64K rows before can p= ossibly do lot more rows (upto 64K rows) in a batch if the memory stays wit= hin the budget. For example, flatten and joins have batch size of 4K rows, = which probably might have been done to be conservative w.r.to memory usage.= By making these operators go upto 64K as long as they stay with in the mem= ory budget=C2=A0should help improve performance. > Also, to improve performance and utilize memory more efficiently, we will > # Allocate memory for value vectors upfront. Since we know the number of= rows and sizing information for each column in the =C2=A0outgoing batch, w= e will use that information to allocate memory for value vectors upfront.= =C2=A0 Currently, we either do initial allocation for 4K values and keep do= ubling every time we need more or allocate for maximum needed upfront. With= this change to pre allocate memory based on sizing calculation, we can imp= rove performance by reducing the memory copies and zeroing the new half we = do every time we double and help save memory in cases where we were over al= locating before. > # Round down the number of rows in outgoing batch to a power of two. Sin= ce memory is allocated in powers of two, this will help us pack the value v= ectors densely thereby reducing the amount of memory that gets wasted becau= se of doubling effect. > So, to summarize, the benefits we will get are improved memory utilizatio= n, better performance, higher concurrency and less queries dying because of= out of memory errors. > Note: Since these sizing calculations are based on averages, strict memor= y usage enforcement is not possible. There could be pathological cases wher= e because of uneven data distribution, we might exceed the configured outpu= t batch size potentially causing OOM errors and problems in downstream oper= ators. > Other issues that will be addressed: > * We are adding extra processing for each batch in each operator to figu= re out the sizing information. This overhead can be reduced by passing this= information along with the batch between operators. > * For some operators, it will be complex to figure out average size of o= utgoing columns especially if we have to evaluate complex expression trees = and UDFs to figure out the transformation on incoming batches. We will use = approximations as appropriate. > Following table summarizes the limits we have today for each operator. > flatten, merge join and external sort have already been changed to adhere= to batch size limits as described in this document as of drill release 1.1= 3. > =C2=A0 > |*Operator*|*Limit*=20 > *(Rows, Memory)*|*Notes*| > |Flatten|4K, 512MB|Flatten can produce very large batches based on averag= e cardinality of the flatten column.| > |Merge Receiver|32K|No memory limit.| > |Hash Aggregate|64K|No memory limit.| > |Streaming Aggregate|32K|No memory limit.| > |Broadcast Sender|None|No limits.| > |Filter, Limit|None|No limits.| > |Hash Join|4K|No memory limit. Joins produce large batches. 4k limit migh= t have been put in place to be conservative w.r.to memory usage.| > |Merge Join|4K|No memory limit| > |Nested Loop Join|4K|No memory limit| > |Partition Sender|1K|=C2=A0| > |Project|64K|No memory limit| > |Selection Vector Remover|None|No limits.| > |TopN|4K|No memory limit| > |Union|None|No limit| > |Windows|None|No limit| > |External Sort|64k, 16 MB|=C2=A0| > |Unordered Receiver|None|No limit| -- This message was sent by Atlassian JIRA (v7.6.3#76005)