flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine Marzougui <yassmar...@gmail.com>
Subject Window not emitting output after upgrade to Flink 1.1.1
Date Fri, 12 Aug 2016 08:30:35 GMT
Hi all,

The following code works under Flink 1.0.3, but under 1.1.1 it just
switches to FINISHED and doesn't output any result.

stream.map(new RichMapFunction<String, Request>() {

        private ObjectMapper objectMapper;

        @Override
        public void open(Configuration parameters) {
            objectMapper = new ObjectMapper();
        }

        @Override
        public Request map(String value) throws Exception {
            return objectMapper.readValue(value, Request.class);
        }

    })
    .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Request>() {
        @Override
        public long extractAscendingTimestamp(Request req) {
            return req.ts;
        }
    })
    .map((Request req) -> new Tuple3<String, String, Integer>(req.userId,
req.location, 1))
    .keyBy(0)
    .timeWindow(Time.minutes(10))
    .apply(
            (Tuple3<String, String, Integer> x, Tuple3<String, String,
Integer> y) -> y,
            (Tuple key, TimeWindow w, Iterable<Tuple3<String, String,
Integer>> itrbl, Collector<Tuple2<String, Integer>> clctr) -> {
                Tuple3<String, String, Integer> res =
itrbl.iterator().next();
                clctr.collect(new Tuple2<>(res.f1, res.f2));
            })
    .print();

The problem is with the window operator because I could print results
before it.

Best,
Yassine

Mime
View raw message