gearpump-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yanghua <...@git.apache.org>
Subject [GitHub] incubator-gearpump issue #190: [Gearpump 311] refactor state management
Date Thu, 22 Jun 2017 03:22:07 GMT
Github user yanghua commented on the issue:

    https://github.com/apache/incubator-gearpump/pull/190
  
    hi @huafengw , this PR works for supporting state management. 
    
    # state api and usage
    
    It provides a suit of state APIs for gearpump such as : 
    
    * ValueState
    * SetState
    * MapState
    * ...
    
    If users want store some useful state to build there business, they need implement a Processor
extends `StatefulTask` and override it's `open` method to init state access object like this
:
    
    ```scala
    stateInternals = getStateInternals(StringUtf8Coder.of, "partitionedKey")
    valueState = stateInternals.get.state(StateNamespaces.global, StateTags.value(valueStateTag,
VarLongCoder.of)
    ```
    
    > you must specify a key for your state because the state would be key-partitioned
store and access
    
    then you must override core message handle method `invoke` to process messages. In this
method you can use state api to access your state like this :
    
    ```scala
    //read
    val state = valueState.read
    //write
    valueState.write(1L)
    ```
    
    # design and implement
    
    in general, this PR mostly contains *four* parts, list below:
    
    * state api trait
    * coders provide encode(serialize) and decode(deserialize) functions for specific state
type
    * state manage and store abstract
    * a default state manage implementation backed with heap memory
    
    ## state store data structure
    
    Generally speaking, it's a **two-level** index structure. The first level index is `StateNamespace`
and the second level index is `StateTag`, you can simply think it as a table's row and column.
    
    It uses Guava's ***Table*** data structure as default storage implementation. `StateNamespace`
as **row key**, `StateTag` as **column key** and `State` as **value**.
    
    # integration with checkpoint
    
    finally, the state will integrate with gearpump's checkpoint mechanism and provide **exactly-once**
process semantic guarantee! 
    
    the `StatefulTask` will try to recovery when the task `onStart` and will try to snapshot
states when the task `onWatermarkProgress`
    
    But there are **some problems** about integrating with checkpoint when recovering.
    
    # inspiration
    
    there state api inspired by Apache Beam and compatible with Apache Beam. Now supported
state apis is a subset of Apache Beam's state apis. And `StateInternals` partly inspired by
Apache Beam's Apex runner.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message