flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Help with generics
Date Thu, 21 Apr 2016 12:27:10 GMT
Hi,
you're right there is not much (very little) in the documentation about
TypeInformation. There is only the description in the JavaDoc:
TypeInformation
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html>
Essentially,
how it works is that we always use a TypeSerializer<T> when we have to
serialize values (for sending across network, storing in state, etc.). A
TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
can be obtained in several ways:

 - the TypeExtractor tries to analyze user functions to determine a
TypeInformation for the input and output type
 - the TypeExtractor can try and analyze a given Class<T> to determine a
TypeInformation
 - the Scala API uses macros and implicit parameters to create
TypeInformation
 - TypeHint can be created to retrieve a TypeInformation
 - a TypeInformation can be manually constructed

tl;dr In your case you can try TypeInformation.of(initModel.class). If that
doesn't work you can try and pass in a function that gives you a
TypeInformation for your model type M.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mneumann@sics.se> wrote:

> Hej,
>
> I pass an instance of M in the constructor of the class, can I use that
> instead? Maybe give the class a function that returns the right
> TypeInformation? I'm trying figure out how TypeInformation works to better
> understand the Issue is there any documentation about this? At the moment I
> don't really understand what TypeInformation does and how it works.
>
> cheers Martin
>
> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> I think it doesn't work because the concrete type of M is not available
>> to create a TypeInformation for M. What you can do is manually pass a
>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>> that when creating the state descriptor.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneumann@sics.se> wrote:
>>
>>> Hey,
>>>
>>> I have a FlatMap that uses some generics (appended at the end of the
>>> mail).
>>> I have some trouble with the type inference running into
>>> InvalidTypesException on the first line in the open function.
>>>
>>> How can I fix it?
>>>
>>> Cheers Martin
>>>
>>>
>>>
>>>
>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends
RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>     private transient ValueState<M> microModel;
>>>     private final double threshold;
>>>     private boolean updateIfAnomaly;
>>>     private M initModel;
>>>
>>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly)
{
>>>         this.threshold = threshold;
>>>         this.updateIfAnomaly = updateIfAnomaly;
>>>         this.initModel = model;
>>>
>>>     }
>>>
>>>     @Override
>>>     public void open(Configuration parameters) throws Exception {
>>>         ValueStateDescriptor<M> descriptor =
>>>                 new ValueStateDescriptor<>(
>>>                         "RollingMicroModel",
>>>                         TypeInformation.of(new TypeHint<M>() {
>>>                         }),initModel
>>>                         );
>>>         microModel = getRuntimeContext().getState(descriptor);
>>>     }
>>>
>>>     @Override
>>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
>>>         M model = microModel.value();
>>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>>
>>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>>             model.addWindow(sample.f0);
>>>             microModel.update(model);
>>>         }
>>>         collector.collect(new Tuple2<>(res,sample.f1));
>>>     }
>>> }
>>>
>>>
>>>
>

Mime
View raw message