flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: State key serializer has not been configured in the config.
Date Thu, 23 Jun 2016 09:31:34 GMT
We should adjust the error message to contain the keyed stream thingy.

On 23.06.2016 10:11, Till Rohrmann wrote:
> Hi Jacob,
>
> the `ListState` abstraction is a state which we call 
> partitioned/key-value state. As such, it is only possible to use it 
> with a keyed stream. This means that you have to call `keyBy` after 
> the `connect` API call.
>
> Cheers,
> Till
>
> On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <mail@jacobbay.dk 
> <mailto:mail@jacobbay.dk>> wrote:
>
>     Hi,
>
>     I am trying to use a ListState in a RichCoFlatMapFunction but when
>     calling: getRuntimeContext().getListState(descriptor) in the
>     open-function i am getting a "State key serializer has not .."
>     exception. I am not sure what i can do to avoid this exception -
>     Are any of you able to provide some help ?
>
>     Best regards
>     Jacob
>
>
>      private ListState<Tuple2<String, Integer>> deltaPositions;
>
>         @Override
>         public void open(org.apache.flink.configuration.Configuration
>     parameters) throws Exception {
>           // Create state variable
>           ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>                   new ListStateDescriptor<>(
>                           "deltaPositions", // the state name
>                           TypeInformation.of(new
>     TypeHint<Tuple2<String, Integer>>() {
>                           }));
>
>           deltaPositions = getRuntimeContext().getListState(descriptor);
>         };
>
>
>
>     2016-06-22 20:41:38,813 INFO
>      org.apache.flink.runtime.taskmanager.Task       - Stream of Items
>     with collection of meadian times (1/1) switched to FAILED with
>     exception.
>     java.lang.RuntimeException: Error while getting state
>     at
>     org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
>     at
>     crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
>     at
>     org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>     at
>     org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>     at
>     org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
>     at
>     org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
>     Caused by: java.lang.Exception: State key serializer has not been
>     configured in the config. This operation cannot use partitioned state.
>     at
>     org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>     at
>     org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>     at
>     org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
>     ... 8 more
>     2016-06-22 20:41:38,815 INFO
>      org.apache.flink.runtime.taskmanager.Task       - Freeing ta
>
>     -- 
>     Jacob Bay Larsen
>
>     Phone: +45 6133 1108 <tel:%2B45%206133%201108>
>     E-mail: mail@jacobbay.dk <mailto:mail@jacobbay.dk>
>
>


Mime
View raw message