flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Martin Neumann <mneum...@sics.se>
Subject Re: Help with generics
Date Thu, 21 Apr 2016 12:16:09 GMT
Hej,

I pass an instance of M in the constructor of the class, can I use that
instead? Maybe give the class a function that returns the right
TypeInformation? I'm trying figure out how TypeInformation works to better
understand the Issue is there any documentation about this? At the moment I
don't really understand what TypeInformation does and how it works.

cheers Martin

On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> I think it doesn't work because the concrete type of M is not available to
> create a TypeInformation for M. What you can do is manually pass a
> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
> that when creating the state descriptor.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneumann@sics.se> wrote:
>
>> Hey,
>>
>> I have a FlatMap that uses some generics (appended at the end of the
>> mail).
>> I have some trouble with the type inference running into
>> InvalidTypesException on the first line in the open function.
>>
>> How can I fix it?
>>
>> Cheers Martin
>>
>>
>>
>>
>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends
RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>     private transient ValueState<M> microModel;
>>     private final double threshold;
>>     private boolean updateIfAnomaly;
>>     private M initModel;
>>
>>     public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
>>         this.threshold = threshold;
>>         this.updateIfAnomaly = updateIfAnomaly;
>>         this.initModel = model;
>>
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>         ValueStateDescriptor<M> descriptor =
>>                 new ValueStateDescriptor<>(
>>                         "RollingMicroModel",
>>                         TypeInformation.of(new TypeHint<M>() {
>>                         }),initModel
>>                         );
>>         microModel = getRuntimeContext().getState(descriptor);
>>     }
>>
>>     @Override
>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
>>         M model = microModel.value();
>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>
>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>             model.addWindow(sample.f0);
>>             microModel.update(model);
>>         }
>>         collector.collect(new Tuple2<>(res,sample.f1));
>>     }
>> }
>>
>>
>>

Mime
View raw message