flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: Window not emitting output after upgrade to Flink 1.1.1
Date Fri, 12 Aug 2016 08:34:28 GMT
Hi Yassine,

Are you reading from a file and use ingestion time?
If yes, then the problem can be related to this:

https://issues.apache.org/jira/browse/FLINK-4329 <https://issues.apache.org/jira/browse/FLINK-4329>

Is this the case?

Best,
Kostas

> On Aug 12, 2016, at 10:30 AM, Yassine Marzougui <yassmarzou@gmail.com> wrote:
> 
> Hi all,
> 
> The following code works under Flink 1.0.3, but under 1.1.1 it just switches to FINISHED
and doesn't output any result.
> 
> stream.map(new RichMapFunction<String, Request>() {
> 
>         private ObjectMapper objectMapper;
> 
>         @Override
>         public void open(Configuration parameters) {
>             objectMapper = new ObjectMapper();
>         }
> 
>         @Override
>         public Request map(String value) throws Exception {
>             return objectMapper.readValue(value, Request.class);
>         }
> 
>     })
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
>         @Override
>         public long extractAscendingTimestamp(Request req) {
>             return req.ts;
>         }
>     })
>     .map((Request req) -> new Tuple3<String, String, Integer>(req.userId, req.location,
1))
>     .keyBy(0)
>     .timeWindow(Time.minutes(10))
>     .apply(
>             (Tuple3<String, String, Integer> x, Tuple3<String, String, Integer>
y) -> y,
>             (Tuple key, TimeWindow w, Iterable<Tuple3<String, String, Integer>>
itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
>                 Tuple3<String, String, Integer> res = itrbl.iterator().next();
>                 clctr.collect(new Tuple2<>(res.f1, res.f2));
>             })
>     .print();
> 
> The problem is with the window operator because I could print results before it.
> 
> Best,
> Yassine


Mime
View raw message