flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xingcan Cui (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
Date Fri, 17 Nov 2017 10:51:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256826#comment-16256826
] 

Xingcan Cui commented on FLINK-8090:
------------------------------------

Hi [~kkl0u], thanks for raising this. I found that a {{State}} is only decided by the name
in its {{StateDescriptor}}. In other words, if we create two descriptors with an identical
name and type, it will return the same state object (though the wrapper may be different).
On the other hand, for two descriptors with an identical name but different types, we can
detect them with a {{ClassCastException}}, which is caused by the type erasure in Java. I'll
try to refactor the {{DefaultKeyedStateStore}} to provide a better error message for the later
case.

Best, Xingcan

> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>            Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> firstMapStateDescriptor
= new MapStateDescriptor<>(
> 					"timon-one",
> 					BasicTypeInfo.INT_TYPE_INFO,
> 					source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new ListStateDescriptor<Integer>(
> 					"timon-one",
> 					BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
> 				private static final long serialVersionUID = -805125545438296619L;
> 				private transient MapState<Integer, Tuple2<Integer, Long>> firstMapState;
>                                 private transient ListState<Integer> secondListState;
> 				@Override
> 				public void open(Configuration parameters) throws Exception {
> 					super.open(parameters);
> 					firstMapState = getRuntimeContext().getMapState(firstMapStateDescriptor);
> 					secondListState = getRuntimeContext().getListState(secondListStateDescriptor);
> 				}
> 				@Override
> 				public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<Object>
out) throws Exception {
> 					Tuple2<Integer, Long> v = firstMapState.get(value.f0);
> 					if (v == null) {
> 						v = new Tuple2<>(value.f0, 0L);
> 					}
> 					firstMapState.put(value.f0, new Tuple2<>(v.f0, v.f1 + value.f1));
> 				}
> 			}
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
> 	at org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: org.apache.flink.runtime.state.heap.HeapMapState
cannot be cast to org.apache.flink.api.common.state.ListState
> 	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> 	... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The error message
should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message