Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A073B200BF8 for ; Fri, 13 Jan 2017 14:57:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9F36E160B3F; Fri, 13 Jan 2017 13:57:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C3932160B32 for ; Fri, 13 Jan 2017 14:57:37 +0100 (CET) Received: (qmail 3588 invoked by uid 500); 13 Jan 2017 13:57:36 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 3579 invoked by uid 99); 13 Jan 2017 13:57:36 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Jan 2017 13:57:36 +0000 Received: from MacBook.fritz.box.mail (dslb-088-072-229-097.088.072.pools.vodafone-ip.de [88.72.229.97]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 67D1E1A01D7 for ; Fri, 13 Jan 2017 13:57:33 +0000 (UTC) Date: Fri, 13 Jan 2017 14:57:30 +0100 From: "Tzu-Li (Gordon) Tai" To: user@flink.apache.org Message-ID: In-Reply-To: References: Subject: Re: Kafka topic partition skewness causes watermark not being emitted X-Mailer: Airmail (397) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="5878dcca_51159642_1cc8" archived-at: Fri, 13 Jan 2017 13:57:38 -0000 --5878dcca_51159642_1cc8 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline Hi, This is expected behaviour due to how the per-partition watermarks are de= signed in the Kafka consumer, but I think it=E2=80=99s probably a good id= ea to handle idle partitions also when the Kafka consumer itself emits wa= termarks. I=E2=80=99ve filed a JIRA issue for this:=C2=A0https://issues.a= pache.org/jira/browse/=46LINK-5479. =46or the time being, I don=E2=80=99t think there will be an easy way to = avoid this with the existing APIs, unfortunately. Is the skewed partition= data intentional, or only for experimental purposes=3F Best, Gordon On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao183=40gmail.com) wrot= e: Hi team, I have a topic with 2 partitions in Kafka. I produced all data to partiti= on 0 and no data to partition 1. I created a =46link job with parallelism= to 1 that consumes that topic and count the events with session event wi= ndow (5 seconds gap). It turned out that the session event window was nev= er closed even I sent a message with 10 minutes gap. After digging into t= he source code, Abstract=46etcher=5B1=5D that is responsible for sending = watermark to downstream calculates the min watermark of all partitions. D= ue to the fact that we don't have data in partition 1, the watermark retu= rned from partition 1is always Long.MIN=5FVALUE therefore Abstract=46etch= er never fires the watermark to downstream.=C2=A0 I want to know if this is expected behavior or a bug. If this is expected= behavior how do I avoid the delay of watermark firing when data is not e= venly distributed to all partitions=3F This is the timestamp extractor I used public class ExactTimestampExtractor implements AssignerWithPeriodicWater= marks =7B private long currentMaxTimestamp =3D Long.MIN=5FVALUE; =40Nullable =40Override public Watermark getCurrentWatermark() =7B return new Watermark(currentMaxTimestamp =3D=3D Long.MIN=5FVALUE =3F Long= .MIN=5FVALUE : currentMaxTimestamp - 1); =7D =40Override public long extractTimestamp(SessionEvent element, long previousElementTi= mestamp) =7B long eventStartTime =3D (long) element.get(SessionEvent.SESSION=5FSTART=5F= DT); if (eventStartTime > currentMaxTimestamp) =7B currentMaxTimestamp =3D eventStartTime; =7D return eventStartTime; =7D =7D and this is the =46link topo // get input data =46linkKafkaConsumer010 consumer =3D new =46linkKafkaConsum= er010<>(=22topic4=22, new MyOwnSchema() consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor()); DataStream input =3D env.addSource(consumer); input. keyBy(=22id=22). window(EventTimeSessionWindows.withGap(Time.seconds(5))). reduce(new Reducer(), new Window=46unction()). print(); // =C2=A0 =C2=A0 =C2=A0 =C2=A0// execute program env.execute(=22a job=22); I used the latest code in github =5B1=5D https://github.com/apache/flink/blob/master/flink-connectors/flin= k-connector-kafka-base/src/main/java/org/apache/flink/streaming/connector= s/kafka/internals/Abstract=46etcher.java=23L539 --5878dcca_51159642_1cc8 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline