flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
Date Mon, 27 Nov 2017 13:32:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266805#comment-16266805
] 

ASF GitHub Bot commented on FLINK-8090:
---------------------------------------

Github user xccui commented on the issue:

    https://github.com/apache/flink/pull/5032
  
    Thanks for the suggestion @aljoscha. 
    
    The problem is the state type is provided via a generic type parameter `S extends State`,
which will be erased in runtime. Thus it's hard to do type checking in `AbstractKeyedStateBackend`
unless we explicitly store and check the type for each state name (and that may affect the
performance). The existing "leave alone" solution seems to be the most efficient way, but
we can only get a `ClassCastException` with that. What do you think?


> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>            Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> firstMapStateDescriptor
= new MapStateDescriptor<>(
> 					"timon-one",
> 					BasicTypeInfo.INT_TYPE_INFO,
> 					source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new ListStateDescriptor<Integer>(
> 					"timon-one",
> 					BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
> 				private static final long serialVersionUID = -805125545438296619L;
> 				private transient MapState<Integer, Tuple2<Integer, Long>> firstMapState;
>                                 private transient ListState<Integer> secondListState;
> 				@Override
> 				public void open(Configuration parameters) throws Exception {
> 					super.open(parameters);
> 					firstMapState = getRuntimeContext().getMapState(firstMapStateDescriptor);
> 					secondListState = getRuntimeContext().getListState(secondListStateDescriptor);
> 				}
> 				@Override
> 				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object>
out) throws Exception {
> 					Tuple2<Integer, Long> v = firstMapState.get(value.f0);
> 					if (v == null) {
> 						v = new Tuple2<>(value.f0, 0L);
> 					}
> 					firstMapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
> 				}
> 			}
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
> 	at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.heap.HeapMapState
cannot be cast to org.apache.flink.api.common.state.ListState
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> 	... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The error message
should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message