drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-5457) Support Spill to Disk for the Hash Aggregate Operator
Date Tue, 02 May 2017 04:24:04 GMT

    [ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992316#comment-15992316

ASF GitHub Bot commented on DRILL-5457:

GitHub user Ben-Zvi opened a pull request:


    DRILL-5457: Spill implementation for Hash Aggregate

    This pull-request is for the work on enabling memory spill for the Hash Aggregate Operator.
    To assist in reviewing this extensive code change, listed below are various topics/issues
that describe the implementation decisions and give some code pointers. The reviewer can read
these items and peek into their relevant code, or read all the items first (and comment on
the design decisions as well).
    The last topic is not "least": It describes many issues and solutions related to the need
to estimate the memory size of batches (and hash tables, etc.) This work took a significant
amount of time, and will need some more to get better.
    (Most of the code changes are in HashAggTemplate.java, hence this file is not mentioned
specifically below)
    ### Aggregation phase:
       The code was changed to pass the aggregation phase information (whether this is a Single
phase, or 1st of two phase, or 2nd of two phase) from the planner to the HAG operator code.
    (See HashAggregate.java, AggPrelBase.java, HashAggPrel.java )
    ### Partitioning:
      The data (rows/groups) coming into the HAG is partitioned into (a power of 2) number
of partitions, based on the N least significant bits of the hash value (computed out of the
row's key columns).
      Each partition can be handled independently of the others. Ideally each partition should
fit into the available memory. The number of partitions is initialized from the option "drill.exec.hashagg.num_partitions",
and scaled down if the available memory seems too small (each partition needs to hold at least
one batch in memory).
      The scaling down uses the formula:  AVAIL_MEMORY >  NUM_PARTITIONS * ( K * EST_BATCH_SIZE
+ 8M )
    (see delayedSetup() ) where K is the option drill.exec.hashagg.min_batches_per_partition
-- see below).
      Computing the number of partitions is delayed till actuall data arrives on incoming
(in order to get an accurate sizing on varchars). See delayedSetup(). There is also special
code for cases data never arrives (empty batches) hence no partitions (see beginning of outputCurrentBatch(),
cleanUp(), delayedSetup() ).
      Many of the code changes made in order to implement multi-partitions follow the original
code, only changing scalar members (of HashAggTemplate) into arrays, like "htable" becomes
      Each partition has its own hash table. After each time it is spilled, its hash table
is freed and reallocated.
    ### Hash Code:
      The hash code computation result was extracted from the HashTable (needed for the partitions),
and added as a parameter to the put() method. Thus for each new incoming row, first the hash
code is computed, and the low N bits are used to select the partition, then the hash code
is right shifted by N, and the result is passed back to the put() method.
      After spilling, the hash codes are not kept. When reading the rows (groups) from the
spill, the hash codes are computed again (and right shifted before use - once per each cycle
of spilling - thus repartitioning).
    (See more about "spilling cycle" below).
    ### Hash Table put():
      The put() method for the hash table was rewriten and simplified. In addition to the
hash-code parameter change, it was changed to return the PutStatus, with two new states: NEW_BATCH_ADDED
notifies the caller that a new batch was created internally, hence a new batch (only needed
for Hash Agg) is needed (prior code was getting this from comparing the returned index against
the prior number of batches).
      A second new state is KEY_ADDED_LAST, which notifies that a batch was just filled, hence
it is time for checking memory availability (because a new batch would be allocated soon).
      Similar rewriting was done for the hash table containsKey() method (and addBatchifNeeded()
    ### Memory pressure check:
      Logically the place to check for a memory pressure is when a new memory is needed (i.e.,
when a new group needs to be created.) However the code structure does not let this easily
(e.g., a new batch is allocated inside the hash table object when a new group is detected,
or the hash table structure is doubled in size),  thus instead the check is done AFTER a new
group was added, in case this was the last group added to that batch  (see in checkGroupAndAggrValues()
- checking for a new status KEY_ADDED_LAST  )
       This memory availability check checks if there is enough memory left between the allocated
so far and the limit.
     Spill is initiated when:  MEMORY_USED + MAX_MEMORY_NEEDED > MEMORY_LIMIT   (see checkGroupAndAggrValues()
     where the memory needed is:  (EST_BATCH_SIZE + 64K * (4+4)) * K * PLANNED_BATCHES + MAX_HASH_TABLES_RESIZE
    (See K above, under Partitioning, and the rest well below, under memory estimations).
    ### When can not spill:
      Single phase HAG can not spill. Also under memory duress 2nd phase may end up with only
a single partition, which can not allow spilling (no progress is made). In these two cases,
the memory check is skipped, and the operator functions like the old code -- if runs out of
memory then it will OOM. A try-catch was added into the code to provide more detail on the
OOM (see getOOMErrorMsg() ).
       Also in case of a single partition the allocator's memory limit is set to 10GB, to
be compatible with the prior code.
       Another "can't spill" situation is when choosing a partition to spill, but no partition
has more than 1 batch  (hence memory can not be gained, as after spilling 1 batch need to
reinitialize that partition with a new batch). See chooseAPartitionToFlush(). In such a case
the code "crosses its fingers" and continues without spilling.
    ### 1st phase - Early return:
      The 1st phase HAG does not spill to disk. When the 1st detects a memory pressure it
picks the current partition (the one whose last batch just got full) and returns that partition
downstream (just like regular return, only early). Afterwards that partition is (deallocated
and) initialized. Note the boolean "earlyOutput" in the code which controls special processing
in this case - when turned on the code switches to output (e.g., innerNext() in HashAggBatch.java),
and turned off when done (see  outputCurrentBatch() ).
    ### Spilling: 
      Using the SpillSet (like the External Sort does) for the actual IO.  Each partition
spills into a single
    file.  Changes to SpillSet: Generalize it for any kind of "buffered memory" operator (pass
in the operator's type). Also small changes to the spill file name.
    ### 2nd phase - Flushing/spilling: 
      When a memory pressure is detected (while reading and processing the incoming), one
of the
    partitions is selected ( see chooseAPartitionToFlush() ) and flushed ( spillAPartition()
), and then its memory is freed and that partition is re-initialized ( reinitPartition() ).
The choice of a partition gives some small priority to the current partition (since its last
batch is full, unlike the others), and priority by a factor of 4 to partitions that are already
spilled (i.e., a spilled partition with 10 batches would be chosen vs a pristine/non-spilled
with 39 batches.)
    Partition spilling (spillAPartition() ):  For each batch in the partition - Allocate an
outgoing container, link the values and the keys into this container, and write it to the
    2nd phase - End of incoming: After the last batch was read from the incoming - the original
code ( next() ) returned a status of NONE. The new code - after spilling can't return NONE,
so instead returning a special status of RESTART (see outputCurrentBatch() ). This RESTART
is captured by the caller of the next() ( innerNext() in HashAggBatch.java ) which continues
to drive the aggregation (instead of returning).
    After the end of the incoming, all the (partially) spilled partitions finish spilling
all their remaining in-memory batches to disk (see outputCurrentBatch() ). This is done to
simplify the later processing of each spilled partition, as well as freeing memory which may
be needed as partitions are processed. The spilled partitions are added into a list (spilledPartitionsList)
to allow for later processing.
    ### 2nd phase reading of the spill: 
    Reading of each spilled partition/file is performed like reading the incoming. For this
purpose, a new class was added: SpilledRecordbatch. The main method there is next() which
reads a batch from the stream -- first time it uses the original readFromStream() method,
which creates a new container; subsequent calls use the new readFromStreamWithContainer()
method, which is similar - only reuses the prior container. (This was done because many places
in the code have references into the container).
    ### Spilling cycles: 
    Reading a spilled partition "just like incoming" allows for that input to spill again
(and again ...);
    this was termed SECONDARY spilling (and TERTIARY ...). As the spilled partitions are kept
in a FIFO list, processing of SECONDARY partitions would start only after all the regular
spilled ones, etc. Hence a member "cycleNum" was created, incremented every time that processing
the spilled list advances to another "cycle" (see outputCurrentBatch() ).
      The "cycleNum" is used for the hash-code computation; the same hash-code is computed
at every cycle, but the cycle tells how much to right-shift that code so that different bits
would be used (for partitioning and hash-table bucket).
    ### Configuration options:
    - drill.exec.hashagg.num_partitions: Initial number of partitions in each HAG operator
(the number may be down adjusted in case too little memory is available). Default value: 32
, allowed range 1 - 128 , where a value of 1 means "No spilling" (and thus setting 10GB limit).
    - drill.exec.hashagg.min_batches_per_partition: Range 2--5. Default 3. Used for internal
initial estimate of number of partitions, and later when predicting memory needed (to avoid
a spill).
      (A value of 2 may be better, but it evokes some bug which would be addressed separately).
    Also using options common to all the "buffered" operators (can be overriden, per operator):
    - drill.exe.spill.fs: File system for spilling into.
    - drill.exec.spill.directories: (Array of) directories to spill into.
    (To override, per-operator: for the (managed) External Sort: "drill.exec.sort.external.spill.fs"
     "drill.exec.sort.external.spill.directories", and for the Hash Aggregate:
     "drill.exec.hashagg.spill.fs" and "drill.exec.hashagg.spill.directories")
    For testing:
    - drill.exec.hashagg.mem_limit: Limit the memory for each HAG operator (also sets this
number in the allocator, hence this is a "hard limit").
    Also for testing (or for a customer workaround ??):
    - planner.force_2phase_aggr: Forces the aggregation to be two phase.
    ### Stats and metrics:
      The hash-table stats were modified to sum the stats across all the partitions' hash
tables. (This only applies to the first spilling cycle; does not count for SECONDARY, TERTIARY
spilling etc.).
      New metrics added:
     - "num partitions" (actual number; may have been scaled down due to memory pressure)
     - "spilled partitions" (number that has spilled)
     - "MB spilled" (in case of 1st phase - that's the total data returned early).
     All the above three refer to the end of input into the HAG (does not include handling
of spills, secondary spills, etc.)
     - "cycle": 0 - no spill (or 1st phase), 1 - regular spill, 2 - Secondary, 3 - Tertiary
    ### Memory Allocation Utilities:
       Extended for all "buffered operators", not only for Sort. (Hash Join will be added
later as well, etc.)
    ###Changes to Unit tests:
    - New TestHashAggSpill : Runs two hash agg queries - One spills some of its partitions
(1 out of 2), and the other test forces a smaller memory hence gets into a Secondary and Tertiary
    - TestBugFixes.testDRILL4884: This test implicitly relied on rows returned in order (the
old Hash agg, plus the Parquet file).
        With the new division into partitions, that order was broken. Fix: added an "order
    - TestTpchDistributedConcurrent.testConcurrentQueries: Needed longer timeout (probably
    ### MISC
    - After a spill, check again if enough memory was freed, else spill (another partition)
again. (Not sure if needed.)
    - Suggestion not implemented: Scaling down the initial hash-table sizes by the number
of partitions (e.g. when 4 partitions, each hash-table starts with 1/4 of the initial size).
Reason for not changing: starting with a small size immediately causes doubling and another
doubling etc. Better allocate a little more and save that work.
    - The RecordBatchSizer had a recent change to handle MAPs (recursively). Merged this change
with the modified measureColumn() which returns an int (the est size).
      As described above, we need to get good estimate of the memory needs in order to decide
initially on the number of partitions, and later to decide each time (a batch gets filled)
wheter to spill or not.
     These estimates are complicated due to:
    (1) Possible changes in the incoming data batches (e.g., varchar(12) in the first batch
becomes varchar(200) in the second incoming batch). This may invalidate prior estimates.
    (2) Arbitrary setting of length 50 for varchar type (when sizing the allocation of DrillBufs)
    (3) Allocation size aligned up to nearest power of 2 (DrillBufs for varchars)
    (4) When an internal batch gets filled, and estimation shows ample memory -- a second
batch may get filled before the first one's partition allocated a new batch (hence may cause
"double booking").
    (5) Inserting a single value may cause the "start indices" (the real actual "hash table")
to double in size. This structure can get pretty large (few MB).
    (6) Does the size of the incoming batch being charged against the HAG's memory allocator
limit ? (Not sure; usually not a problem as the prior batch is deallocated before the next
one comes; unless the next one is "much bigger")
    (7) For varchars: The memory is allocated as a power of 2 (e.g. doubled via setSafe()).
This can cause a big memory waste, like if the total memory needed for 64k varchars is ~5MB,
then 8MB is allocated, wasting 3MB).
    (8) The varchar value vector uses an internal "offset vector" that allocates "size+1",
hence for 64K it allocates 512kb, of which 256kb are wasted (see DRILL-5446).
    ### Solutions for the memory estimation issues:
    (1)+(6) above: Monitor the size of each incoming batch. Resize batch size estimate if
the incoming batch is bigger (see doWork() )
    (5) When estimating memory needs, take into account hash table size doubling in all partitions
(using the new hash table method extraMemoryNeededForResize() ).
    (4) Track "plannedBatches"; when "promising" few partitions a new batch each, take this
into account when checking for available memory. (Though "more than 1" situation seems very
    (2)+(3) Idealy tracking the size of EACH varchar column could work better, but not simple
to implement. Instead -- just find the maximum size of any of the incoming columns (for simplicity
- not only varchars), and use this value (capped at 50, min value of 8; rounded up to the
next power of 2 if needed).  This addresses the common situation of multiple short varchar
key columns but not the (very rare) situation of a huge varchar KEY column, plus few short
    (7) Update RecordBatchSizer.java -- added a method  netRowWidthCap50()  which takes into
account the rounding up (per each column in a row), plus nulls arrays as needed, for each
row (will multiply that by 64K in updateEstMaxBatchSize() ).
    ==== END ====

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ben-Zvi/drill hashagg-spill

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #822
commit 1e436cef2dd8c4f519e584fbe18d233eab26468f
Author: Boaz Ben-Zvi <boazben-zvi@bbenzvi-e754-mbp13.local>
Date:   2017-05-02T02:59:49Z

    Spill implementation for Hash Aggregate


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
> Support gradual spilling memory to disk as the available memory gets too small to allow
in memory work for the Hash Aggregate Operator.

This message was sent by Atlassian JIRA

View raw message