From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink issue #2512: [FLINK-4379] Rescalable non-partitioned state
Date Wed, 21 Sep 2016 09:09:23 GMT
Github user StefanRRichter commented on the issue:

    I have some suggestions for renaming some of the interfaces and their methods in this
pull request to come up with some clearer, more consistent naming schemes. I suggest the following
    ## 1) Renaming the state handle that points to operator state: PartitionableStateHandle
-> OperatorStateHandle
    ## 2) Rename: PartitionableStateBackend -> OperatorStateStore
    	 * User-side interface for storing (partitionable) operator state.
    	public interface OperatorStateStore {
    		 * Creates (or restores) the partitionable state in this backend. Each state is registered
under a unique name.
    		 * The provided serializer is used to de/serialize the state in case of checkpointing
    		<S> ListState<S> getListState(String name, TypeSerializer<S> partitionStateSerializer)
throws Exception;
    ## 3) Rename: PartitionableSnapshotStateBackend -> OperatorStateBackend. I propose
that the term backend now refers to some (i) store with the ability to (ii) snapshot.
    	 * Interface that combines both, the user facing {@link OperatorStateStore} interface
and the system interface
    	 * {@link SnapshotProvider}
    	public interface OperatorStateBackend
    			extends OperatorStateStore, SnapshotProvider<PartitionableOperatorStateHandle>
    ## 4) Rename: PartitionableCheckpointed -> CheckpointedOperator
    	- `storeOperatorState` -> `snapshotState`
    	- `restoreOperatorState` -> `restoreState`
    	public interface CheckpointedOperator {
    		 * This method is called when state should be stored for a checkpoint. The state can
be registered and written to
    		 * the provided state store.
    		void snapshotState(long checkpointId, OperatorStateStore stateStore) throws Exception;
    		 * This method is called when state should be restored from a checkpoint. The state
can be obtained from the
    		 * provided state store.
    		void restoreState(OperatorStateStore stateStore) throws Exception;
    ## 5) Rename: StateRepartitioner -> OperatorStateRepartitioner
    	 * Interface that allows to implement different strategies for repartitioning of operator
state as parallelism changes.
    	public interface OperatorStateRepartitioner {
    		List<Collection<OperatorStateHandle>> repartitionOperatorState(
    				List<OperatorStateHandle> previousParallelSubtaskStates,
    				int parallelism);
    ## 6) Add new interface that allows user-friendly checkpointing code for simple cases
that to not require custorm serializer
    	 * Simplified interface as adapter to the more complex CheckpointedOperator
    	public interface ListCheckpointed<T extends Serializable> {
    		List<T> snapshotState(long checkpointId) throws Exception;
    		void restoreState(List<T> state) throws Exception;
    ## 7) OperatorStateBackend lifecycle
    Another point that we might want to discuss is the life cycle of `OperatorStateBackend`.
Currently, a new backend is created (+restored) for each invocation of the methods in `CheckpointedOperator`.
This always provides a clean backend to take the operator state for a snapshot. I wonder if
it could make sense to create `OperatorStateBackend` just once for each `AbstractStreamOperator`,
similar to the KeyedStateBackend. This would give users the option to actually keep operator
state only in the `OperatorStateBackend`. However, we need a way to signal that all state
must be passed to the backend before a snapshot. For example, large operator states could
be managed in RocksDB this way, and we could provide more proxy collections (currently we
only support a list of substates) over time.
    What do you think @aljoscha @StephanEwen ?

