flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yunfan123 <yunfanfight...@foxmail.com>
Subject Can ValueState use generics?
Date Sun, 07 May 2017 13:14:48 GMT
My process function is like :

    private static class MergeFunction extends
RichProcessFunction<Tuple2&lt;Integer, ObjectNode>, Tuple2<Integer,
ObjectNode>> {

        private ValueState<Tuple2&lt;Integer, ObjectNode>> state;

        @Override
        @SuppressWarnings("unchecked")
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new
ValueStateDescriptor<>("mystate",
                    (Class<Tuple2&lt;Integer,ObjectNode>>)
(Object)Tuple2.class));
        }
}


When I running the code: 
05/07/2017 21:17:47	Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) switched
to FAILED
java.lang.RuntimeException: Cannot create full type information based on the
given class. If the type has generics, please
	at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:124)
	at
org.apache.flink.api.common.state.ValueStateDescriptor.<init>(ValueStateDescriptor.java:101)
	at
com.bytedance.flinkjob.activationSource.AppActivationSource$MergeFunction.open(AppActivationSource.java:134)
	at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
	at
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:55)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
	at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Tuple needs to be parameterized by using generics.
	at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:673)
	at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:607)
	at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:561)
	at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:557)
	at
org.apache.flink.api.common.state.StateDescriptor.<init>(StateDescriptor.java:122)
	... 9 more

Can I use generics with ValueState?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-ValueState-use-generics-tp13038.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message