flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Using Kryo for serializing lambda closures
Date Tue, 08 Dec 2015 19:23:06 GMT
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and
initializing them in the open() method of a RichFunction has been good
enough for all cases I am aware of. That's probably why the issue hasn't
been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java
8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimiduk@apache.org>:

> That's what I feared. IMO this is very limiting when mixing in other
> projects where a user does not have control over those projects' APIs. At
> least falling back to an extensible serialization mechanism (like Kryo)
> allows users to register serializers external to the types they're
> consuming.
>
> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this issue.
>
> -n
>
> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Nick,
>>
>> at the moment Flink uses Java serialization to ship the UDFs to the
>> cluster. Therefore, the closures must only contain Serializable objects.
>> The serializer registration only applies to the data which is processed by
>> the Flink job. Thus, for the moment I would try to get rid of the
>> ColumnInfo object in your closure.
>>
>> Cheers,
>> Till
>> ‚Äč
>>
>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I've implemented a (streaming) flow using the Java API and Java8 Lambdas
>>> for various map functions. When I try to run the flow, job submission fails
>>> because of an unserializable type. This is not a type of data used within
>>> the flow, but rather a small collection of objects captured in the closure
>>> context over one of my Lambdas. I've implemented and registered a Kryo
>>> Serializer for this type with this environment, however, it's apparently
>>> not used when serializing the lambdas. Seems like the same serialization
>>> configuration and tools of the environment should be used when preparing
>>> the job for submission. Am I missing something?
>>>
>>> Thanks,
>>> Nick
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>>         at
>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>>         at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>>         at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>>         at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>> Caused by: org.apache.flink.api.common.InvalidProgramException: Object
>>> ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>>         at
>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>>         at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>>         at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>>         at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>>         at
>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>>         at ImportFlow.main(ImportFlow.java:178)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>         at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>>         ... 6 more
>>> Caused by: java.io.NotSerializableException:
>>> org.apache.phoenix.util.ColumnInfo
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>         at
>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>         at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>         at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>         at
>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>         at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>         at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>         at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>         at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>         at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>         at
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>>         at
>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>>         ... 17 more
>>>
>>
>>
>

Mime
View raw message