flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9491) Implement timer data structure based on RocksDB
Date Fri, 29 Jun 2018 08:55:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527338#comment-16527338

ASF GitHub Bot commented on FLINK-9491:

GitHub user StefanRRichter opened a pull request:


    [FLINK-9491] Implement timer data structure based on RocksDB

    ## What is the purpose of the change
    This PR is another step towards integrating the timer state with the keyed state backends.
    First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue`
so that the functionality of a heap-set-organized state is decoupled from storing timers.
The main reason for this is that state/backend related code lives in flink-runtime and timers
are a concept from flink-streaming.
    Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics
(i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB
is always partitioned into key-groups, so the general idea is to organize the implementation
as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that
merges by priority over the key-group boundaries. The implementation reuses the in-memory
implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that
holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`,
consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store
(`OrderedSetStore`), currently applying simple write-through synchronization between cache
and store. In the current implementation, the cache is based on a an AVL-Tree and restricted
in capacity. The store is backed by a RocksDB column family. We utilize caching to reduced
read-accesses to RocksDB.
    Please note that the RocksDB implementation is currently not yet integrated with the timer
service or the backend. This will happen in the next steps.
    ## Brief change log
    - Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures
from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
    - Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`)
and a heap extended with set-semantics (`HeapPriorityQueueSet`).
    - Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics.
Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue`
of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one
key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet`
to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
    ## Verifying this change
    I added dedicated tests for all data structures.
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (yes, fastutil)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    ## Documentation
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6228
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s.richter@...>
Date:   2018-06-19T08:01:30Z

    Introduce MAX_ARRAY_SIZE as general constant

commit dd64cbb7eb15317a4dc8f0626c50dffd58e6b5f9
Author: Stefan Richter <s.richter@...>
Date:   2018-06-18T12:38:01Z

    Generalization of timer queue to a queue(set) that is no longer coupled to timers and
implementation for RocksDB


> Implement timer data structure based on RocksDB
> -----------------------------------------------
>                 Key: FLINK-9491
>                 URL: https://issues.apache.org/jira/browse/FLINK-9491
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
> We can now implement timer state that is stored in RocksDB for users that run the {{RocksDBKeyedStateBackend}}.
As explained in the design document (https://docs.google.com/document/d/1XbhJRbig5c5Ftd77d0mKND1bePyTC26Pz04EvxdA7Jc/edit#heading=h.17v0k3363r6q)
this should also give us asynchronous and incremental snapshots for timer state that is larger
than main memory.
> We need to think about a way in which to user can select either to run timers on RocksDB
or on the heap when using the {{RocksDBKeyedStateBackend}}.

This message was sent by Atlassian JIRA

View raw message