flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Tipper <john_tip...@hotmail.com>
Subject Does Flink support raw generic types in a merged stream?
Date Wed, 17 Jul 2019 12:18:31 GMT
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a different parameterised
type? I'd like to process the combined stream of values as a single raw type, casting to a
specific type for detailed processing, based on some information in the type that will allow
me to safely cast to the specific type.

I can't share my exact code, but the below example shows the sort of thing I want to do.

So, as an example, given the following generic type:

class MyGenericContainer<IN> extends Tuple3<String, IN, SomeOtherClass> {
    ...
    private final String myString;
    private final IN value;
    private final Class<IN> clazz; // created by constructor
    private SomeOtherClass someOtherClass;
    ...
}

and 2 streams, I'd like to be able to do something like:

DataStream<MyGenericContainer<String>> stream1 = ...
DataStream<MyGenericContainer<Integer>> stream2 = ...

DataStream<...> merged = stream1.union(stream2).process(new MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast rawValue

However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable
'IN' in 'class com.example.MyGenericTuple3' could not be determined. This is most likely a
type erasure problem. The type extraction currently supports types with generic variables
only in cases where all variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
    at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
    at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)

If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
    .process(...)
    .returns(new TypeHint<MyGenericContainer>() {})

then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The TypeHint is using
a generic variable.This is not supported, generic types must be fully specified for the TypeHint.

Is this sort of thing supported or is there another way of joining multiple streams into a
single stream, where each stream object will have a specific type of a common generic type?


Many thanks,

John


Mime
View raw message