The point is to provide a means for user to work around nonconforming APIs. Kryo at least is extensible in that you can register additional serializers.

On Tue, Dec 8, 2015 at 11:40 AM, Stephan Ewen <sewen@apache.org> wrote:
Actually, this should be independent of Java 8 lambdas vs Java 7 anonymous classes.
I have been using Java 8 lambdas quite a bit with Flink.

The important thing is that no non-serializable objects are in the closure.

As Fabian mentioned, lazy initialization helps. Serializability is also discussed here: http://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable

Adding another serialization framework may help for cases where simply the java.io.Serializable interface is missing in an object. However, Not everything is magically serializable with Kryo.
There are classes that you can serialize with Java Serialization, but not out of the box with Kryo (especially when immutable collections are involved). Also classes that have no default constructors, but have checks on invariants, etc can fail with Kryo arbitrarily.



On Tue, Dec 8, 2015 at 8:28 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
Ah, very good. I've closed my issue as a duplicate. Thanks for the reference.

On Tue, Dec 8, 2015 at 11:23 AM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi Nick,

thanks for pushing this and opening the JIRA issue.

The issue came up a couple of times and a known limitation (see FLINK-1256).
So far the workaround of marking member variables as transient and initializing them in the open() method of a RichFunction has been good enough for all cases I am aware of. That's probably why the issue hasn't been addressed yet.

Of course this is not a satisfying solution, if you would like to use Java 8 lambda functions.

Best, Fabian

2015-12-08 19:38 GMT+01:00 Nick Dimiduk <ndimiduk@apache.org>:
That's what I feared. IMO this is very limiting when mixing in other projects where a user does not have control over those projects' APIs. At least falling back to an extensible serialization mechanism (like Kryo) allows users to register serializers external to the types they're consuming.


-n

On Tue, Dec 8, 2015 at 1:37 AM, Till Rohrmann <trohrmann@apache.org> wrote:

Hi Nick,

at the moment Flink uses Java serialization to ship the UDFs to the cluster. Therefore, the closures must only contain Serializable objects. The serializer registration only applies to the data which is processed by the Flink job. Thus, for the moment I would try to get rid of the ColumnInfo object in your closure.

Cheers,
Till


On Mon, Dec 7, 2015 at 10:02 PM, Nick Dimiduk <ndimiduk@gmail.com> wrote:
Hello,

I've implemented a (streaming) flow using the Java API and Java8 Lambdas for various map functions. When I try to run the flow, job submission fails because of an unserializable type. This is not a type of data used within the flow, but rather a small collection of objects captured in the closure context over one of my Lambdas. I've implemented and registered a Kryo Serializer for this type with this environment, however, it's apparently not used when serializing the lambdas. Seems like the same serialization configuration and tools of the environment should be used when preparing the job for submission. Am I missing something?

Thanks,
Nick

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: Object ImportFlow$$Lambda$11/1615389290@44286963 not serializable
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:149)
        at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:550)
        at ImportFlow.assembleImportFlow(ImportFlow.java:111)
        at ImportFlow.main(ImportFlow.java:178)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        ... 6 more
Caused by: java.io.NotSerializableException: org.apache.phoenix.util.ColumnInfo
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:762)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 17 more