flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: How to fill flink's datastream
Date Mon, 04 Sep 2017 08:16:50 GMT
Hi Andrea,

a MapFunction calls its map() function for each stream element and returns
exactly one result value.
MapFunctions are used for 1-to-1 transformations.
The returns() method allows to specify the return type of an operator, in
your case the MapOperator. It is only necessary if Flink cannot
automatically determine the return type of an operator.

It's not easy to identify what is going on from the code you posted.
Are you sure the program is executed, i.e., did you call env.execute()?
Are all parts of the program connected?
Are you sure that the input stream of the Map operator emits records?

Best, Fabian


2017-09-02 19:23 GMT+02:00 AndreaKinn <kinn6aer@hotmail.it>:

> Hi,
> Excuse me for the unclear title but I don't know how to summarise the
> question.
> I'm using an external library integrated with Flink called Flink-HTM. It is
> still a prototype.
> Internally, it performs everything I want but I have a problem returning
> evaluated values in a  printable datastream.
> I posted here my question because I believe the problem is tied with Flink
> and not with the library.
>
> Essentially I have the following code in my main:
>
> */DataStream<Double> result = HTM.learn(kafkaStream, new
> Harness.AnomalyNetwork())
>                                 .select(new InferenceSelectFunction<Harness.KafkaRecord,
> Double>() {
>                   @Override
>                     public Double select(Tuple2<Harness.KafkaRecord,
> NetworkInference> inference) throws Exception {
>                                                 return
> inference.f1.getAnomalyScore();
>                     }
>                                 });/*
>
> Then I want to print the datastream "result".
> Following the /learn/ method the flink-htm lib correctly performs many
> operations on data.
> At the end of this computation, in another class I have a /DataStream<T,
> NetworkInference>/ and essentially I have to call the overridden "/select/"
> method on that/ Datastream<T,NetworkInference>/.
>
> The code which would do that is:
>
> */final DataStream<Tuple2&lt;T, NetworkInference>> inferenceStream =
> inferenceStreamBuilder.build();
>
>            return inferenceStream
>                 .map(new InferenceSelectMapper<T,
> R>(clean(inferenceSelectFunction)))
>                 .returns(returnType);    /
> *
> where /map/ and /returns/ methods are described in Flink's
> /DataStream.class./
>
> */public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper)
{
>
>                 TypeInformation<R> outType =
> TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
>                                 Utils.getCallLocationName(), true);
>
>                 return transform("Map", outType, new
> StreamMap<>(clean(mapper)));
>         }/*
>
> */public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo)
> {
>                 requireNonNull(typeInfo, "TypeInformation must not be
> null");
>
>                 transformation.setOutputType(typeInfo);
>                 return this;
>         }/*
>
> while /InferenceSelectMapper<T,R>/ is the following class:
>
> */private static class InferenceSelectMapper<T, R> implements
> MapFunction<Tuple2&lt;T, NetworkInference>, R> {
>
>         private final InferenceSelectFunction<T, R>
> inferenceSelectFunction;
>
>         public InferenceSelectMapper(InferenceSelectFunction<T, R>
> inferenceSelectFunction) {
>                         this.inferenceSelectFunction =
> inferenceSelectFunction;
>          }
>
>         @Override
>         public R map(Tuple2<T, NetworkInference> value) throws Exception {
>                         return inferenceSelectFunction.select(value);
>         }
>     }/*
>
> which implements Flink's /MapFunction/. I absolutely need the program call
> the /InferenceSelectMapper.map()/ method to call my defined "/select/"
> function, unfortunately this doesn't happen. As consequence of that, in
> main
> method and in the IDE console, I suppose the /DataStream result/ is not
> filled and none output is printed, which is the my fundamental problem.
>
> Since I'm not a Flink expert I don't know how to perform many operations at
> "lower level".
> Honestly I don't understand exactly what /map/ and /returns/ methods of
> /DataStream.class/ do. I thought a lot about it and I also tried to find a
> way to call /InferenceSelectMapper.map()/ method but I don't know how to
> extract the /Tuple2<T, NetworkInference>/ from the
> /DataStream<Tuple2&lt;...>>/.
>
> I'm absolutely sure that the /map/ function I need in
> /InferenceSelectMethod/ is not called because it doesn't appear in call
> hierarchy and also adding a print instruction that is not showed.
>
> Please, can you help me to solve this? I've been stuck on it for a week
> while the lib's owner doesn't reply to my mails.
> Sorry for the length.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Mime
View raw message