gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yanghua <>
Subject [GitHub] incubator-gearpump issue #190: [Gearpump 311] refactor state management
Date Thu, 22 Jun 2017 03:22:07 GMT
Github user yanghua commented on the issue:
    hi @huafengw , this PR works for supporting state management. 
    # state api and usage
    It provides a suit of state APIs for gearpump such as : 
    * ValueState
    * SetState
    * MapState
    * ...
    If users want store some useful state to build there business, they need implement a Processor
extends `StatefulTask` and override it's `open` method to init state access object like this
    stateInternals = getStateInternals(StringUtf8Coder.of, "partitionedKey")
    valueState = stateInternals.get.state(, StateTags.value(valueStateTag,
    > you must specify a key for your state because the state would be key-partitioned
store and access
    then you must override core message handle method `invoke` to process messages. In this
method you can use state api to access your state like this :
    val state =
    # design and implement
    in general, this PR mostly contains *four* parts, list below:
    * state api trait
    * coders provide encode(serialize) and decode(deserialize) functions for specific state
    * state manage and store abstract
    * a default state manage implementation backed with heap memory
    ## state store data structure
    Generally speaking, it's a **two-level** index structure. The first level index is `StateNamespace`
and the second level index is `StateTag`, you can simply think it as a table's row and column.
    It uses Guava's ***Table*** data structure as default storage implementation. `StateNamespace`
as **row key**, `StateTag` as **column key** and `State` as **value**.
    # integration with checkpoint
    finally, the state will integrate with gearpump's checkpoint mechanism and provide **exactly-once**
process semantic guarantee! 
    the `StatefulTask` will try to recovery when the task `onStart` and will try to snapshot
states when the task `onWatermarkProgress`
    But there are **some problems** about integrating with checkpoint when recovering.
    # inspiration
    there state api inspired by Apache Beam and compatible with Apache Beam. Now supported
state apis is a subset of Apache Beam's state apis. And `StateInternals` partly inspired by
Apache Beam's Apex runner.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

View raw message