flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Either not NotSerializableException and InvalidTypesException
Date Sat, 28 Nov 2015 19:18:22 GMT
Hi squirrels,

I have 2 problems with the new Either type and I could use your help to
understand them.

1. I have a piece of code that looks like this:

TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo =
...
DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet =
initialVertices.map(...).returns(workSetTypeInfo);

This gives me the following exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.graph.spargelnew.MessagePassingIteration$InitializeWorkSet@75ba8574
not serializable

at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
at org.apache.flink.api.java.DataSet.clean(DataSet.java:184)
at org.apache.flink.api.java.DataSet.map(DataSet.java:214)
at
org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:160)
at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190)
at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650)
at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53)

Caused by: java.io.NotSerializableException:
org.apache.flink.api.java.typeutils.Either$Left
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)

​Making​ Either implement
java.io.Serializable
solves this, but I am wondering why this is needed. Since I'm registering
the typeinfo with returns(), shouldn't the EitherTypeSerializer be
registered too? Also, this seem to be the only operation where I get this
error, even though I'm using the Either type in other places as well.


2. The second problem appeared after rebasing to the current master,
containing a fix for FLINK-3046 (Integrate the Either Java type with the
TypeExtractor).

Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'Message' in 'class
org.apache.flink.graph.spargelnew.MessagePassingIteration$AppendVertexState'
could not be determined. This is most likely a type erasure problem. The
type extraction currently supports types with generic variables only in
cases where all variables in the return type can be deduced from the input
type(s).

at
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:706)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:458)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:713)
at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:425)
at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:380)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getBinaryOperatorReturnType(TypeExtractor.java:320)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:176)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getFlatJoinReturnTypes(TypeExtractor.java:170)
at
org.apache.flink.api.java.operators.JoinOperator$DefaultJoin.with(JoinOperator.java:562)
at
org.apache.flink.graph.spargelnew.MessagePassingIteration.createResult(MessagePassingIteration.java:171)
at org.apache.flink.api.java.DataSet.runOperation(DataSet.java:1190)
at org.apache.flink.graph.Graph.runMessagePassingIteration(Graph.java:1650)
at org.apache.flink.graph.spargelnew.example.PregelCC.main(PregelCC.java:53)


​The code giving this exception is the following:
​
DataSet<Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> verticesWithMsgs
=
​
    ​
iteration.getSolutionSet().join(iteration.getWorkset())
​    ​
.where(0).equalTo(0)
​    ​
.with(new AppendVertexState<K, VV, Message>())
​    ​
.
​​
​​
returns(new TupleTypeInfo<Tuple2<Vertex<K, VV>, Either<NullValue,
Message>>>(vertexType, nullableMsgTypeInfo));

​Do I need to register the Either typeinfo differently ​now that it's
integrated with the TypeExtractor or is this a bug?

If you want to see the complete code, I've pushed it here:
https://github.com/vasia/flink/blob/spargel-2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargelnew/MessagePassingIteration.java#L168
.

Thanks!
-Vasia.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message