flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Either not NotSerializableException and InvalidTypesException
Date Sun, 29 Nov 2015 15:05:57 GMT
Hi Vasia,

regarding your TypeExtractor problem. The TypeExtractor works correctly. 
The with() function of the JoinOperator calls the wrong TypeExtractor 
method that does not allow missing type info. This is a bug. Can open an 
issue for that?

Regards,
Timo

On 28.11.2015 20:18, Vasiliki Kalavri wrote:
> Hi squirrels,
>
> I have 2 problems with the new Either type and I could use your help to
> understand them.
>
> 1. I have a piece of code that looks like this:
>
> TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo
= ...
> DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet =
> initialVertices.map(...).returns(workSetTypeInfo);
>
> This gives me the following exception:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.flink.graph.spargelnew.MessagePassingIteration$InitializeWorkSet@75ba8574
> not serializable
>
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
> at org.apache.flink.api.java.DataSet.clean(DataSet.java:184)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:214)
> at
> org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:160)
> at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190)
> at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650)
> at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53)
>
> Caused by: java.io.NotSerializableException:
> org.apache.flink.api.java.typeutils.Either$Left
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>
> ​Making​ Either implement
> java.io.Serializable
> solves this, but I am wondering why this is needed. Since I'm registering
> the typeinfo with returns(), shouldn't the EitherTypeSerializer be
> registered too? Also, this seem to be the only operation where I get this
> error, even though I'm using the Either type in other places as well.
>
>
> 2. The second problem appeared after rebasing to the current master,
> containing a fix for FLINK-3046 (Integrate the Either Java type with the
> TypeExtractor).
>
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Type of
> TypeVariable 'Message' in 'class
> org.apache.flink.graph.spargelnew.MessagePassingIteration$AppendVertexState'
> could not be determined. This is most likely a type erasure problem. The
> type extraction currently supports types with generic variables only in
> cases where all variables in the return type can be deduced from the input
> type(s).
>
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:706)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:458)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:713)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:425)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getBinaryOperatorReturnType(TypeExtractor.java:320)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:176)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:170)
> at
> org.apache.flink.api.java.operators.JoinOperator$DefaultJoin.with(JoinOperator.java:562)
> at
> org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:171)
> at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190)
> at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650)
> at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53)
>
>
> ​The code giving this exception is the following:
> ​
> DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs
> =
> ​
>      ​
> iteration.getSolutionSet().join(iteration.getWorkset())
> ​    ​
> .where(0).equalTo(0)
> ​    ​
> .with(new AppendVertexState<K, VV, Message>())
> ​    ​
> .
> ​​
> ​​
> returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue,
> Message>>>(vertexType, nullableMsgTypeInfo));
>
> ​Do I need to register the Either typeinfo differently ​now that it's
> integrated with the TypeExtractor or is this a bug?
>
> If you want to see the complete code, I've pushed it here:
> https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/MessagePassingIteration.java#L168
> .
>
> Thanks!
> -Vasia.
>


Mime
View raw message