flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state
Date Wed, 21 Sep 2016 09:09:23 GMT
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2512
  
    Hi,
    
    I have some suggestions for renaming some of the interfaces and their methods in this
pull request to come up with some clearer, more consistent naming schemes. I suggest the following
changes:
    
    ## 1) Renaming the state handle that points to operator state: PartitionableStateHandle
-> OperatorStateHandle
    
    ## 2) Rename: PartitionableStateBackend -> OperatorStateStore
    ```
    	/**
    	 * User-side interface for storing (partitionable) operator state.
    	 */
    	public interface OperatorStateStore {
    
    		/**
    		 * Creates (or restores) the partitionable state in this backend. Each state is registered
under a unique name.
    		 * The provided serializer is used to de/serialize the state in case of checkpointing
(snapshot/restore).
    		 */
    		<S> ListState<S> getListState(String name, TypeSerializer<S> partitionStateSerializer)
throws Exception;
    	}
    ```
    
    ## 3) Rename: PartitionableSnapshotStateBackend -> OperatorStateBackend. I propose
that the term backend now refers to some (i) store with the ability to (ii) snapshot.
    ```
    	/**
    	 * Interface that combines both, the user facing {@link OperatorStateStore} interface
and the system interface
    	 * {@link SnapshotProvider}
    	 */
    	public interface OperatorStateBackend
    			extends OperatorStateStore, SnapshotProvider<PartitionableOperatorStateHandle>
{
    	}
    ```
    
    ## 4) Rename: PartitionableCheckpointed -> CheckpointedOperator
    	- `storeOperatorState` -> `snapshotState`
    	- `restoreOperatorState` -> `restoreState`
    
    ```
    	public interface CheckpointedOperator {
    
    		/**
    		 * This method is called when state should be stored for a checkpoint. The state can
be registered and written to
    		 * the provided state store.
    		 */
    		void snapshotState(long checkpointId, OperatorStateStore stateStore) throws Exception;
    
    		/**
    		 * This method is called when state should be restored from a checkpoint. The state
can be obtained from the
    		 * provided state store.
    		 */
    		void restoreState(OperatorStateStore stateStore) throws Exception;
    	}
    ```
    
    ## 5) Rename: StateRepartitioner -> OperatorStateRepartitioner
    ```
    	/**
    	 * Interface that allows to implement different strategies for repartitioning of operator
state as parallelism changes.
    	 */
    	public interface OperatorStateRepartitioner {
    
    		List<Collection<OperatorStateHandle>> repartitionOperatorState(
    				List<OperatorStateHandle> previousParallelSubtaskStates,
    				int parallelism);
    	}
    ```
    
    ## 6) Add new interface that allows user-friendly checkpointing code for simple cases
that to not require custorm serializer
    ```
    	/**
    	 * Simplified interface as adapter to the more complex CheckpointedOperator
    	 */
    	public interface ListCheckpointed<T extends Serializable> {
    
    		List<T> snapshotState(long checkpointId) throws Exception;
    
    		void restoreState(List<T> state) throws Exception;
    	}
    ```
    
    ## 7) OperatorStateBackend lifecycle
    
    Another point that we might want to discuss is the life cycle of `OperatorStateBackend`.
Currently, a new backend is created (+restored) for each invocation of the methods in `CheckpointedOperator`.
This always provides a clean backend to take the operator state for a snapshot. I wonder if
it could make sense to create `OperatorStateBackend` just once for each `AbstractStreamOperator`,
similar to the KeyedStateBackend. This would give users the option to actually keep operator
state only in the `OperatorStateBackend`. However, we need a way to signal that all state
must be passed to the backend before a snapshot. For example, large operator states could
be managed in RocksDB this way, and we could provide more proxy collections (currently we
only support a list of substates) over time.
    
    What do you think @aljoscha @StephanEwen ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message