flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #6227: Heap abstractions rocks
Date Fri, 29 Jun 2018 08:47:15 GMT
GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6227

    Heap abstractions rocks

    ## What is the purpose of the change
    
    This PR is another step towards integrating the timer state with the keyed state backends.
    
    First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue`
so that the functionality of a heap-set-organized state is decoupled from storing timers.
The main reason for this is that state/backend related code lives in flink-runtime and timers
are a concept from flink-streaming.
    
    Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics
(i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB
is always partitioned into key-groups, so the general idea is to organize the implementation
as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that
merges by priority over the key-group boundaries. The implementation reuses the in-memory
implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that
holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`,
consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store
(`OrderedSetStore`), currently applying simple write-through synchronization between cache
and store. In the current implementation, the cache is based on a an AVL-Tree and restricted
in capacity. The store is backed by a RocksDB column family. 
 We utilize caching to reduced read-accesses to RocksDB.
    
    Please note that the RocksDB implementation is currently not yet integrated with the timer
service or the backend. This will happen in the next steps.
    
    ## Brief change log
    
    - Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures
from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
    - Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`)
and a heap extended with set-semantics (`HeapPriorityQueueSet`).
    - Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics.
Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue`
of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one
key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet`
to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
    
    
    ## Verifying this change
    
    I added dedicated tests for all data structures.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes, fastutil)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6227.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6227
    
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s.richter@...>
Date:   2018-06-19T08:01:30Z

    Introduce MAX_ARRAY_SIZE as general constant

commit b5522ba34b2166e698a1695390a6d1fc4c671bb9
Author: Stefan Richter <s.richter@...>
Date:   2018-06-18T12:38:01Z

    Generalization of timer queue to a queue(set) that is no longer coupled to timers and
implementation for RocksDB.

----


---

Mime
View raw message