flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Dimiduk <ndimi...@gmail.com>
Subject Using Kryo for serializing lambda closures
Date Mon, 07 Dec 2015 21:02:28 GMT
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas
for various map functions. When I try to run the flow, job submission fails
because of an unserializable type. This is not a type of data used within
the flow, but rather a small collection of objects captured in the closure
context over one of my Lambdas. I've implemented and registered a Kryo
Serializer for this type with this environment, however, it's apparently
not used when serializing the lambdas. Seems like the same serialization
configuration and tools of the environment should be used when preparing
the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at
org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object
ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at
org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException:
org.apache.phoenix.util.ColumnInfo
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more

Mime
View raw message