flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: TimeWindowAll doeesn't assign properly
Date Fri, 29 Jul 2016 16:49:35 GMT
Hi,
the single-element-windows to me indicate that these originate from
elements that arrived at the window operator after the watermark. In the
current version of Flink these elements will be emitted as a single-element
window. You can avoid this by writing a custom EventTimeTrigger that does
not fire on late elements. In Flink version 1.1 we also introduce a setting
that allows to specify an allowed lateness after which elements are dropped.

Cheers,
Aljoscha

On Fri, 29 Jul 2016 at 17:30 Sendoh <unicorn.banachi@gmail.com> wrote:

> Hi Flink users,
>
> We have an issue that TimeWindowAll() doesn't assign properly. The sum
> should be in the same window but is generated in separate windows.
>
> For example in the following, window 832348384 has window start time
> 2016-07-20T05:57:00.000 with counts 36, and there is another window
> 832348384 has window start time 2016-07-20T05:57:00.000 with count 1. They
> should be aggregated in the same window 832348384 with counts 37.
>
> ...// hashCode in winodw, sum of events in the window, window start time
> {"hashCode":-832348384,"count":36,"startDate":"2016-07-20T05:57:00.000"}
> {"hashCode":-832348384,"count":1,"startDate":"2016-07-20T05:57:00.000"}
> {"hashCode":-830444128,"count":452,"startDate":"2016-07-20T05:58:00.000"}
> {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
> {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
> {"hashCode":-830444128,"count":1,"startDate":"2016-07-20T05:58:00.000"}
> ...
>
> Example code is as follows:
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> Config.bootstrapServers);
>         properties.setProperty("group.id",
> parameter.getRequired("groupId"));
>         properties.setProperty("auto.offset.reset", "earliest");
>
>         FlinkKafkaConsumer09<JSONObject> kafkaConsumer = new
> FlinkKafkaConsumer09<>(Config.topic, new JSONSchema(), properties);
>
>         DataStream<JSONObject> streams = env.addSource(kafkaConsumer)
>                 .assignTimestampsAndWatermarks(new
> CorrelationWatermark()).rebalance();
>
>         DataStream<JSONObject> afterWindow =
> streams.timeWindowAll(Time.minutes(1))
>                 .apply(new SumAllWindow());
>
>
> public static class SumAllWindow implements AllWindowFunction<JSONObject,
>             JSONObject, TimeWindow> {
>
>         @Override
>         public void apply(TimeWindow timeWindow, Iterable<JSONObject>
> values,
>                           Collector<JSONObject> collector) throws Exception
> {
>
>             DateTime startTs = new DateTime(timeWindow.getStart());
>             JSONObject jsonObject = new JSONObject();
>
>             int sum = 0;
>             for (JSONObject value : values){
>                 sum += 1;
>             }
>
>             jsonObject.put("startDate", startTs.toString());
>             jsonObject.put("count", sum);
>             jsonObject.put("hashCode", timeWindow.hashCode());
>             collector.collect(jsonObject);
>         }
>     }
>
>
> public class CorrelationWatermark implements
> AssignerWithPeriodicWatermarks<JSONObject> {
>     private final long maxOutOfOrderness = 10000 * 1;
>     private long currentMaxTimestamp;
>
>     @Override
>     public long extractTimestamp(JSONObject element, long
> previousElementTimestamp) {
>         long timestamp =
> DateTime.parse(element.get("occurredAt").toString(),
> Config.timeFormatter).getMillis();
>         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>     }
> }
>
> We have no problem with a smaller Kafka topic with Flink 1.0.3. Do we make
> a
> mistake somewhere?
> Please let me know if any further information is required to resolve this
> issue.
>
> Best,
>
> Sendoh
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeWindowAll-doeesn-t-assign-properly-tp8201.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message