flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Dimiduk <ndimi...@gmail.com>
Subject Re: Using Kryo for serializing lambda closures
Date Tue, 08 Dec 2015 19:44:25 GMT
The point is to provide a means for user to work around nonconforming APIs.
Kryo at least is extensible in that you can register additional serializers.

On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <sewen@apache.org> wrote:

> Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous
> classes.
> I have been using Java 8 lambdas quite a bit with Flink.
>
> The important thing is that no non-serializable objects are in the closure.
>
> As Fabian mentioned, lazy initialization helps. Serializability is also
> discussed here:
> http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>
> Adding another serialization framework may help for cases where simply the
> java.io.Serializable interface is missing in an object. However, Not
> everything is magically serializable with Kryo.
> There are classes that you can serialize with Java Serialization, but not
> out of the box with Kryo (especially when immutable collections are
> involved). Also classes that have no default constructors, but have checks
> on invariants, etc can fail with Kryo arbitrarily.
>
>
>
> On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
>
>> Ah, very good. I've closed my issue as a duplicate. Thanks for the
>> reference.
>>
>> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi Nick,
>>>
>>> thanks for pushing this and opening the JIRA issue.
>>>
>>> The issue came up a couple of times and a known limitation (see
>>> FLINK-1256).
>>> So far the workaround of marking member variables as transient and
>>> initializing them in the open() method of a RichFunction has been good
>>> enough for all cases I am aware of. That's probably why the issue hasn't
>>> been addressed yet.
>>>
>>> Of course this is not a satisfying solution, if you would like to use
>>> Java 8 lambda functions.
>>>
>>> Best, Fabian
>>>
>>> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimiduk@apache.org>:
>>>
>>>> That's what I feared. IMO this is very limiting when mixing in other
>>>> projects where a user does not have control over those projects' APIs. At
>>>> least falling back to an extensible serialization mechanism (like Kryo)
>>>> allows users to register serializers external to the types they're
>>>> consuming.
>>>>
>>>> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this
>>>> issue.
>>>>
>>>> -n
>>>>
>>>> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrmann@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Nick,
>>>>>
>>>>> at the moment Flink uses Java serialization to ship the UDFs to the
>>>>> cluster. Therefore, the closures must only contain Serializable
>>>>> objects. The serializer registration only applies to the data which is
>>>>> processed by the Flink job. Thus, for the moment I would try to get rid
of
>>>>> the ColumnInfo object in your closure.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ‚Äč
>>>>>
>>>>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimiduk@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I've implemented a (streaming) flow using the Java API and Java8
>>>>>> Lambdas for various map functions. When I try to run the flow, job
>>>>>> submission fails because of an unserializable type. This is not a
type of
>>>>>> data used within the flow, but rather a small collection of objects
>>>>>> captured in the closure context over one of my Lambdas. I've implemented
>>>>>> and registered a Kryo Serializer for this type with this environment,
>>>>>> however, it's apparently not used when serializing the lambdas. Seems
like
>>>>>> the same serialization configuration and tools of the environment
should be
>>>>>> used when preparing the job for submission. Am I missing something?
>>>>>>
>>>>>> Thanks,
>>>>>> Nick
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error.
>>>>>>         at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>>>>>         at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>>>>>         at
>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>>>>>         at
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>>>>>         at
>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>>>>>         at
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>>>>>         at
>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>>>>> Caused by: org.apache.flink.api.common.InvalidProgramException:
>>>>>> Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>>>>>         at
>>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>>>>>         at
>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>>>>>         at
>>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>>>>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>>>>>         at ImportFlow.main(ImportFlow.java:178)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>         at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>>>>>         ... 6 more
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.apache.phoenix.util.ColumnInfo
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>         at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>         at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>         at
>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>         at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>         at
>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>>>>>         at
>>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>>>>>         ... 17 more
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message