flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Dimiduk <ndimi...@apache.org>
Subject Re: Using Kryo for serializing lambda closures
Date Tue, 08 Dec 2015 18:38:24 GMT
That's what I feared. IMO this is very limiting when mixing in other
projects where a user does not have control over those projects' APIs. At
least falling back to an extensible serialization mechanism (like Kryo)
allows users to register serializers external to the types they're
consuming.

I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue.

-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrmann@apache.org> wrote:

> Hi Nick,
>
> at the moment Flink uses Java serialization to ship the UDFs to the
> cluster. Therefore, the closures must only contain Serializable objects.
> The serializer registration only applies to the data which is processed by
> the Flink job. Thus, for the moment I would try to get rid of the
> ColumnInfo object in your closure.
>
> Cheers,
> Till
> ‚Äč
>
> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
>
>> Hello,
>>
>> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
>> for various map functions. When I try to run the flow, job submission fails
>> because of an unserializable type. This is not a type of data used within
>> the flow, but rather a small collection of objects captured in the closure
>> context over one of my Lambdas. I've implemented and registered a Kryo
>> Serializer for this type with this environment, however, it's apparently
>> not used when serializing the lambdas. Seems like the same serialization
>> configuration and tools of the environment should be used when preparing
>> the job for submission. Am I missing something?
>>
>> Thanks,
>> Nick
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>         at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>         at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>         at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>         at
>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>         at ImportFlow.main(ImportFlow.java:178)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>         ... 6 more
>> Caused by: java.io.NotSerializableException:
>> org.apache.phoenix.util.ColumnInfo
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>         at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>         at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>         at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>         at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>         at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>         at
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>         at
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>         ... 17 more
>>
>
>

Mime
View raw message