flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: missing data in window.reduce() while apply() is ok
Date Tue, 25 Oct 2016 13:53:35 GMT
Hi,
could you please go into more detail about the input and what the expected
output is. And then also what the output is with both apply() and reduce()?

With this we might be able to figure it out together.

Cheers,
Aljoscha

On Mon, 24 Oct 2016 at 18:11 Sendoh <unicorn.banachi@gmail.com> wrote:

> Hi Flink users,
>
> I saw a strange behavior that data are missing in reduce() but apply()
> doesn't, and when using 1.0.3 we don't see this behavior, and we see this
> in
> 1.1.3. Losing data means we don't see any event in the keys assigned, not
> the count of events.
>
> The code is as follows.
>
> DataStream<Map&lt;String, Object>> streams = env.addSource(new
> FlinkKafkaConsumer09<>(topicList, new SimpleStringSchema(), properties))
>                 .name("kafka_topics")
>                 .rebalance()
>                 .flatMap(new Eventsmap(events))
>                 .assignTimestampsAndWatermarks(new EventWatermark());
>
>         DataStream<Map&lt;String, Object>> count = streams
>                 .keyBy(new
> CompoundJsonKeySelector()).timeWindow(Time.minutes(1))
>                 .allowedLateness(Time.minutes(3))
> //               apply is ok
> //                .apply(new WindowFunction<Map&lt;String, Object>,
> Map<String, Object>, String, TimeWindow>() {
> //                           @Override
> //                           public void apply(String s, TimeWindow
> timeWindow, Iterable<Map&lt;String, Object>> iterable,
> Collector<Map&lt;String, Object>> collector) throws Exception {
> //                               Iterator<Map&lt;String, Object>> it =
> iterable.iterator();
> //                               collector.collect(it.next());
> //                           }
> //                       }
> //                );
> //               reduce() loses data
>                 .reduce(new ReduceFunction<Map&lt;String, Object>>() {
>                     @Override
>                     public Map<String, Object> reduce(Map<String, Object>
> v1, Map<String, Object> v2) throws Exception {
>                         int newCount =
> Integer.parseInt(v1.get("count").toString()) +
> Integer.parseInt(v2.get("count").toString());
>                         v2.put("count",newCount);
>                         return v2;
>                     }
>                 });
>
> Best,
>
> Is there any suggestion that we can try to
> figure out the root cause?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/missing-data-in-window-reduce-while-apply-is-ok-tp9689.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message