flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9571) Switch to internal states in StateBinder
Date Sat, 16 Jun 2018 14:15:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514790#comment-16514790
] 

ASF GitHub Bot commented on FLINK-9571:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903638
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle createColumnFamily(String stateName)
throws IOExcepti
     	}
     
     	@Override
    -	protected <N, T> InternalValueState<K, N, T> createValueState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ValueStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult
=
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBValueState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalListState<K, N, T> createListState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, List<T>>>
registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBListState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getElementSerializer(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T> InternalReducingState<K, N, T> createReducingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult
=
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBReducingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getReduceFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(
    -		TypeSerializer<N> namespaceSerializer,
    -		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>>
registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBAggregatingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getAggregateFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(
    +	public <N, SV, S extends State, IS extends S> IS createState(
     		TypeSerializer<N> namespaceSerializer,
    -		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, ACC>>
registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBFoldingState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				stateDesc.getFoldFunction(),
    -				this);
    -	}
    -
    -	@Override
    -	protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -		TypeSerializer<N> namespaceSerializer,
    -		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, Map<UK,
UV>>> registerResult =
    -				tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
    -
    -		return new RocksDBMapState<>(
    -				registerResult.f0,
    -				registerResult.f1.getNamespaceSerializer(),
    -				registerResult.f1.getStateSerializer(),
    -				stateDesc.getDefaultValue(),
    -				this);
    +		StateDescriptor<S, SV> stateDesc) throws Exception {
    +		if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +			String message = String.format("State %s is not supported by %s",
    +				stateDesc.getClass(), this.getClass());
    +			throw new UnsupportedOperationException(message);
    --- End diff --
    
    The exception type is a bit inconsistent, in other place throw `FlinkRuntimeException`,
maybe it better to make this consistent.


> Switch to internal states in StateBinder
> ----------------------------------------
>
>                 Key: FLINK-9571
>                 URL: https://issues.apache.org/jira/browse/FLINK-9571
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The StateBinder factory for state objects is not a part of public API and it produces
in fact only internal states.
> It can be changed it to produce internal state interfaces instead of public API.
> This can help to expose internal state API for internal components which use the factory,
e.g. for state TTL wrappers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message