flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Using Kryo for serializing lambda closures
Date Tue, 08 Dec 2015 19:54:14 GMT
That is true.

If you want to look into this, there are probably two places that need
adjustment:

1) The UserCodeObjectWrapper would need to be adjusted to hold a serialized
object (a byte[]) for shipping and serialize that object differently (say
trying Java, then falling back to Kryo).

2) The ClosureCleaner that also check serializability should then no longer
check eagerly for serializability.

After that, we could think about exposing a way to register custom
serializers with Kryo for the UserCodeObjectWrapper.

Greetings,
Stephan


On Tue, Dec 8, 2015 at 8:44 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:

> The point is to provide a means for user to work around nonconforming
> APIs. Kryo at least is extensible in that you can register additional
> serializers.
>
> On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <sewen@apache.org> wrote:
>
>> Actually, this should be independent of Java 8 lambdas vs Java 7
>> anonymous classes.
>> I have been using Java 8 lambdas quite a bit with Flink.
>>
>> The important thing is that no non-serializable objects are in the
>> closure.
>>
>> As Fabian mentioned, lazy initialization helps. Serializability is also
>> discussed here:
>> http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
>>
>> Adding another serialization framework may help for cases where simply
>> the java.io.Serializable interface is missing in an object. However, Not
>> everything is magically serializable with Kryo.
>> There are classes that you can serialize with Java Serialization, but not
>> out of the box with Kryo (especially when immutable collections are
>> involved). Also classes that have no default constructors, but have checks
>> on invariants, etc can fail with Kryo arbitrarily.
>>
>>
>>
>> On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
>>
>>> Ah, very good. I've closed my issue as a duplicate. Thanks for the
>>> reference.
>>>
>>> On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> thanks for pushing this and opening the JIRA issue.
>>>>
>>>> The issue came up a couple of times and a known limitation (see
>>>> FLINK-1256).
>>>> So far the workaround of marking member variables as transient and
>>>> initializing them in the open() method of a RichFunction has been good
>>>> enough for all cases I am aware of. That's probably why the issue hasn't
>>>> been addressed yet.
>>>>
>>>> Of course this is not a satisfying solution, if you would like to use
>>>> Java 8 lambda functions.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimiduk@apache.org>:
>>>>
>>>>> That's what I feared. IMO this is very limiting when mixing in other
>>>>> projects where a user does not have control over those projects' APIs.
At
>>>>> least falling back to an extensible serialization mechanism (like Kryo)
>>>>> allows users to register serializers external to the types they're
>>>>> consuming.
>>>>>
>>>>> I opened https://issues.apache.org/jira/browse/FLINK-3148 for this
>>>>> issue.
>>>>>
>>>>> -n
>>>>>
>>>>> On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrmann@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Nick,
>>>>>>
>>>>>> at the moment Flink uses Java serialization to ship the UDFs to the
>>>>>> cluster. Therefore, the closures must only contain Serializable
>>>>>> objects. The serializer registration only applies to the data which
is
>>>>>> processed by the Flink job. Thus, for the moment I would try to get
rid of
>>>>>> the ColumnInfo object in your closure.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>> ‚Äč
>>>>>>
>>>>>> On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimiduk@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I've implemented a (streaming) flow using the Java API and Java8
>>>>>>> Lambdas for various map functions. When I try to run the flow,
job
>>>>>>> submission fails because of an unserializable type. This is not
a type of
>>>>>>> data used within the flow, but rather a small collection of objects
>>>>>>> captured in the closure context over one of my Lambdas. I've
implemented
>>>>>>> and registered a Kryo Serializer for this type with this environment,
>>>>>>> however, it's apparently not used when serializing the lambdas.
Seems like
>>>>>>> the same serialization configuration and tools of the environment
should be
>>>>>>> used when preparing the job for submission. Am I missing something?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Nick
>>>>>>>
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
main
>>>>>>> method caused an error.
>>>>>>>         at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
>>>>>>>         at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
>>>>>>>         at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:252)
>>>>>>>         at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>>>>>>>         at
>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
>>>>>>>         at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>>>>>>>         at
>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
>>>>>>> Caused by: org.apache.flink.api.common.InvalidProgramException:
>>>>>>> Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
>>>>>>>         at
>>>>>>> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
>>>>>>>         at ImportFlow.assembleImportFlow(ImportFlow.java:111)
>>>>>>>         at ImportFlow.main(ImportFlow.java:178)
>>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>> Method)
>>>>>>>         at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>         at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>>         at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
>>>>>>>         ... 6 more
>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>> org.apache.phoenix.util.ColumnInfo
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>>         at java.util.ArrayList.writeObject(ArrayList.java:762)
>>>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>> Method)
>>>>>>>         at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>         at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>>         at
>>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>>         at
>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>>         at
>>>>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>>>>>>>         ... 17 more
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message