flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Serialization problem: Using generic that extends a class on POJO.
Date Tue, 15 Aug 2017 11:02:15 GMT
Hi Ido,

thank you for your good example to reproduce the problem. I could find a 
bug in Flink's type extraction logic and opened an issue for it [0]. The 
problem seems to be the bounded generics in both the Foo and FooFoo. 
Foo.someKey has the wrong type information. It is 
GenericType<java.lang.Object>.

As a workaround until the issue is fixed you can do the following:

@TypeInfo(Foo.TypeFactory.class)
public static class Foo<SomeKey extends BarKey>implements Serializable {

     public SomeKey someKey; public Foo() {}

     public Foo(SomeKey someKey) {

         this.someKey = someKey; }

     public static class TypeFactoryextends TypeInfoFactory<Foo> {

       @Override public TypeInformation<Foo>createTypeInfo(Type t, Map<String, TypeInformation<?>>
genericParameters) {
          return new GenericTypeInfo<>(Foo.class); }
    }
}


I hope that helps.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-7450

Am 14.08.17 um 17:24 schrieb Timo Walther:
> Hi Ido,
>
> at the first glance, I could not find any problem in your code. So it 
> might be a bug. The "environment.registerType()" is not needed in your 
> case, because you have no generic types.
>
> I will have a closer look at it tomorrow.
>
> Regards,
> Timo
>
> Am 14.08.17 um 16:35 schrieb Ido Bar Av:
>>
>> Hi,
>>
>> We’re using flink 1.3.1, and we’re trying to pass through the 
>> pipeline a POJO object that has a generic field )see details in the 
>> complete example below):
>>
>> We have the class Foo<SomeKey extends BarKey>, and when sending a 
>> subclass with a specific SomeKey, we get the following exception:
>>
>> java.lang.RuntimeException: Cannot instantiate class.
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
>>
>>               at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
>>
>>               at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
>>
>>               at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>
>>               at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>
>>               at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>>
>>               at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>
>>               at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>
>>               at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>
>>               at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey 
>> field …Foo.someKey to java.lang.Object
>>
>>               at java.lang.reflect.Field.set(Field.java:764)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
>>
>>               ... 10 more
>>
>> If I understand correctly, for some reason, the deserializer used for 
>> SomeKey returns Object (before filling it), ignoring the fact that 
>> SomeKey extends a BarKey, and then fails when trying to assign it to 
>> the parent class.
>>
>> What is the correct approach for this situation?
>>
>> Thanks,
>>
>> Ido
>>
>> Complete code example:
>>
>> *public class BarKey implements Serializable {*
>>
>> *    public List<Long> valueList;*
>>
>> **
>>
>> *    public BarKey() {*
>>
>> *    }*
>>
>> **
>>
>> *    public BarKey(long value) {*
>>
>> *        super();*
>>
>> *        valueList = new ArrayList<>();*
>>
>> *        valueList.add(value);*
>>
>> *    }*
>>
>> **
>>
>> *    @Override*
>>
>> *    public boolean equals(Object o) {*
>>
>> *        if (this == o) {*
>>
>> *            return true;*
>>
>> *        }*
>>
>> *        if (o == null || getClass() != o.getClass()) {*
>>
>> *            return false;*
>>
>> *        }*
>>
>> *        BarKey barKey = (BarKey) o;*
>>
>> *        return Objects.equals(valueList, barKey.valueList);*
>>
>> *    }*
>>
>> **
>>
>> *    @Override*
>>
>> *    public int hashCode() {*
>>
>> *        return Objects.hash(valueList);*
>>
>> *    }*
>>
>> *}*
>>
>> **
>>
>> **
>>
>> *public class SomeKey extends BarKey implements Serializable {*
>>
>> *public Integer banana=1;*
>>
>> **
>>
>> *public SomeKey() {*
>>
>> *}*
>>
>> **
>>
>> *public SomeKey(long value) {*
>>
>> *super(value);*
>>
>> *}*
>>
>> *}*
>>
>> *public class Foo<SomeKey extends BarKey> implements Serializable {*
>>
>> **
>>
>> *    public Foo() {}*
>>
>> *    public SomeKey someKey;*
>>
>> **
>>
>> *    public Foo(SomeKey someKey) {*
>>
>> *        this.someKey = someKey;*
>>
>> *    }*
>>
>> **
>>
>> *}*
>>
>> **
>>
>> *public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> 
>> implements Serializable {*
>>
>> *    public FooFoo() {*
>>
>> *    }*
>>
>> **
>>
>> *    public Integer grill = 12;*
>>
>> *    public FooFoo(SomeKey someKey) {*
>>
>> *        super(someKey);*
>>
>> *    }*
>>
>> **
>>
>> *}*
>>
>> class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> 
>> implements Serializable {
>>
>>     @Override
>>
>>     public void processElement(Integer value, Context ctx, 
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>>         out.collect(new FooFoo<>(new SomeKey((long) value)));
>>
>>     }
>>
>> }
>>
>> class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>>

>> implements Serializable {
>>
>>     @Override
>>
>>     public void processElement(Foo<BarKey> value, Context ctx, 
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>>         value.someKey.valueList.add(1L);
>>
>>         out.collect(value);
>>
>>     }
>>
>> }
>>
>> class FooBarSelector<SomeKey extends BarKey> implements 
>> KeySelector<Foo<SomeKey>, BarKey>, Serializable {
>>
>>     @Override
>>
>>     public BarKey getKey(Foo<SomeKey> value) throws Exception {
>>
>>         return value.someKey;
>>
>>     }
>>
>> }
>>
>> class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
>>
>>     private static final Logger logger = 
>> LoggerFactory.getLogger(FooBarSink.class);
>>
>>     public long dosomething = 0;
>>
>>     @Override
>>
>>     public void invoke(Foo<BarKey> value) throws Exception {
>>
>>         dosomething += value.someKey.valueList.size();
>>
>>         logger.warn("Sink {}", dosomething);
>>
>>     }
>>
>> }
>>
>> Test code:
>>
>> environment.registerType(FooFoo.class); // Not certain if this is needed
>>
>>         List<Integer> intlist = new ArrayList<>();
>>
>>         intlist.add(3);
>>
>>         intlist.add(5);
>>
>>         DataStreamSource<Integer> streamSource = 
>> environment.fromCollection(intlist);
>>
>>         streamSource.process(new MakeFoo())
>>
>>              .keyBy(new FooBarSelector<>())
>>
>>              .process(new FooProcessor())
>>
>>              .addSink(new FooBarSink());
>>
>> environment.execute(“Jobname-UT");
>>
>


Mime
View raw message