flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: flink - Working with State example
Date Thu, 11 Aug 2016 09:45:20 GMT
Hello Buvana,

Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?

Thanks a lot,
Kostas

> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <buvana.ramanan@nokia-bell-labs.com>
wrote:
> 
> Hello,
>  
> I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
<https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html> and
particularly ‘open’ function in my code:
> @Override
> 
>     public void open(Configuration config) {
> 
>         ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
> 
>                 new ValueStateDescriptor<>(
> 
>                         "average", // the state name
> 
>                         TypeInformation.of(new TypeHint<Tuple2<Long, Long>>()
{}), // type information
> 
>                         Tuple2.of(0L, 0L)); // default value of the state, if nothing
was set
> 
>         sum = getRuntimeContext().getState(descriptor);
> 
>     }
> 
>  
> When I run, I get the following error:
> Caused by: java.lang.RuntimeException: Error while getting state
>                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
>                at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>                at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>                at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>                at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
>                at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
>                at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
>                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>                at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been configured in the config.
This operation cannot use partitioned state.
>                at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>                at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>                at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
>                ... 8 more
>  
> Where do I define the key & value serializer for state?
>  
> Thanks,
> Buvana


Mime
View raw message