flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9571) Switch to internal states in StateBinder
Date Sat, 16 Jun 2018 14:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514786#comment-16514786
] 

ASF GitHub Bot commented on FLINK-9571:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903433
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
---
    @@ -203,91 +216,16 @@ private boolean hasRegisteredState() {
     	}
     
     	@Override
    -	public <N, V> InternalValueState<K, N, V> createValueState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ValueStateDescriptor<V> stateDesc) throws Exception {
    -
    -		StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapValueState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    -	}
    -
    -	@Override
    -	public <N, T> InternalListState<K, N, T> createListState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -		StateTable<K, N, List<T>> stateTable = tryRegisterStateTable(namespaceSerializer,
stateDesc);
    -		return new HeapListState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    -	}
    -
    -	@Override
    -	public <N, T> InternalReducingState<K, N, T> createReducingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -		StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
    -		return new HeapReducingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getReduceFunction());
    -	}
    -
    -	@Override
    -	public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
    -
    -		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer,
stateDesc);
    -		return new HeapAggregatingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getAggregateFunction());
    -	}
    -
    -	@Override
    -	public <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
    -			TypeSerializer<N> namespaceSerializer,
    -			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -		StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer,
stateDesc);
    -		return new HeapFoldingState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getFoldFunction());
    -	}
    -
    -	@Override
    -	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -			TypeSerializer<N> namespaceSerializer,
    -			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -		StateTable<K, N, Map<UK, UV>> stateTable = tryRegisterStateTable(namespaceSerializer,
stateDesc);
    -
    -		return new HeapMapState<>(
    -				stateTable,
    -				keySerializer,
    -				stateTable.getStateSerializer(),
    -				stateTable.getNamespaceSerializer(),
    -				stateDesc.getDefaultValue());
    +	public <N, SV, S extends State, IS extends S> IS createState(
    +		TypeSerializer<N> namespaceSerializer,
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), this.getClass());
    +			throw new FlinkRuntimeException(message);
    +		}
    +		StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer,
stateDesc);
    +		return STATE_FACTORIES.get(stateDesc.getClass()).createState(stateDesc, stateTable,
keySerializer);
    --- End diff --
    
    The same like above, maybe the `get()` and `containsKey()` could be merged into one `get()`.


> Switch to internal states in StateBinder
> ----------------------------------------
>
>                 Key: FLINK-9571
>                 URL: https://issues.apache.org/jira/browse/FLINK-9571
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The StateBinder factory for state objects is not a part of public API and it produces
in fact only internal states.
> It can be changed it to produce internal state interfaces instead of public API.
> This can help to expose internal state API for internal components which use the factory,
e.g. for state TTL wrappers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message