From Ido Bar Av <idba...@microsoft.com>
Subject Serialization problem: Using generic that extends a class on POJO.
Date Mon, 14 Aug 2017 14:35:17 GMT

We're using flink 1.3.1, and we're trying to pass through the pipeline a POJO object that
has a generic field )see details in the complete example below):
We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific
SomeKey, we get the following exception:

java.lang.RuntimeException: Cannot instantiate class.
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
              at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not set ....BarKey field ...Foo.someKey
to java.lang.Object
              at java.lang.reflect.Field.set(Field.java:764)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
              ... 10 more

If I understand correctly, for some reason, the deserializer used for SomeKey returns Object
(before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when
trying to assign it to the parent class.
What is the correct approach for this situation?


Complete code example:
public class BarKey implements Serializable {
    public List<Long> valueList;

    public BarKey() {

    public BarKey(long value) {
        valueList = new ArrayList<>();

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        if (o == null || getClass() != o.getClass()) {
            return false;
        BarKey barKey = (BarKey) o;
        return Objects.equals(valueList, barKey.valueList);

    public int hashCode() {
        return Objects.hash(valueList);

public class SomeKey extends BarKey implements Serializable {
    public Integer banana=1;

    public SomeKey() {

    public SomeKey(long value) {

public class Foo<SomeKey extends BarKey> implements Serializable {

    public Foo() {}
    public SomeKey someKey;

    public Foo(SomeKey someKey) {
        this.someKey = someKey;


public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable
    public FooFoo() {

    public Integer grill = 12;
    public FooFoo(SomeKey someKey) {

class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable
    public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>>
out) throws Exception {
        out.collect(new FooFoo<>(new SomeKey((long) value)));

class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements
Serializable {
    public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>>
out) throws Exception {

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>,
BarKey>, Serializable {
    public BarKey getKey(Foo<SomeKey> value) throws Exception {
        return value.someKey;

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
    private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);
    public long dosomething = 0;

    public void invoke(Foo<BarKey> value) throws Exception {
        dosomething += value.someKey.valueList.size();
        logger.warn("Sink {}", dosomething);

Test code:

        environment.registerType(FooFoo.class); // Not certain if this is needed

        List<Integer> intlist = new ArrayList<>();

        DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

        streamSource.process(new MakeFoo())
             .keyBy(new FooBarSelector<>())
             .process(new FooProcessor())
             .addSink(new FooBarSink());


