Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7A74A200B8B for ; Tue, 4 Oct 2016 19:20:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 75F93160ACC; Tue, 4 Oct 2016 17:20:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E385E160AC7 for ; Tue, 4 Oct 2016 19:20:18 +0200 (CEST) Received: (qmail 41747 invoked by uid 500); 4 Oct 2016 17:20:18 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 41737 invoked by uid 99); 4 Oct 2016 17:20:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Oct 2016 17:20:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8C239C1D9F for ; Tue, 4 Oct 2016 17:20:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.88 X-Spam-Level: * X-Spam-Status: No, score=1.88 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id lgwftKniuBex for ; Tue, 4 Oct 2016 17:20:15 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id B2B535FBC7 for ; Tue, 4 Oct 2016 17:20:14 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id b81so71629946lfe.1 for ; Tue, 04 Oct 2016 10:20:14 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=46uUMGD7c9B0eVPyx6gbrxuz8YqK0sba5iA8OZaLfj8=; b=0XkGUVGJJysqJq1SL65h5qVpY9s12l4lYqo/qjTXjZGY4vkkyY2FPWzyVF67Sgin7T 6WvXA8sMlTiITFTvE7LIIamBl1pBDS3Am8W0n+8ClL3XiY4fNbHyWmOWwrieZ0BMTDAA HhBnF4eCs7PuUx4GK7NsqLueuf5n6HDhjvDjNnwzZqHWn0RiyJihkkbdFapPJQD6e+rK kGLoC9OCw/wKvvKhkqVQCfByqqUH8DmL7Am0t6fFSKu5Ii+FEid9kJhwF2lMl8rJ2bNW 2hTVTOemMFJdCMSA9ZPy5O0KZBU0sqdI2FSwY9fbrrZXOiyGMUGeuEkA/F7WCJdjFaFI RuGA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=46uUMGD7c9B0eVPyx6gbrxuz8YqK0sba5iA8OZaLfj8=; b=NbSVQD5Or3c59S6xDamXZPd0uAHqCc6rnOGH+OVG01aPHTxX7BQiA84ON/jv2/8N+s 9z5reIKGRiCQw61jz3LDaRjz5Eeb1sCfPKs6pIn/LOyqf+MoUTybGWNeogITVpyn17qS uHcs5ePoPy6gPiiL02tOHW1lHc97e7ji4CfzYy+yFR5Xj5P48DJkDXMkuxu/cOJ+JonZ ZNJTXaUVTednKHGfP/ZY0gjSxTMFto4xwpRP8pCzKstbBLIM+cGqCjO/7IsxCJ2BxIjp GUvW56upgX0qUvlJ9jfD0tPV29C21YTYMAc+G6SBLUGKPIdJzufcX51xahvNYkggxDW4 8aTA== X-Gm-Message-State: AA6/9RmvC3gFaBhqAlKSqb71vYS9I1105SlUoTq111IG6nbumiVaNhUDFc2DQfHcS3PmyeNvv8Hz/2CXreZcpw== X-Received: by 10.195.3.4 with SMTP id bs4mr3863836wjd.193.1475601613162; Tue, 04 Oct 2016 10:20:13 -0700 (PDT) MIME-Version: 1.0 Received: by 10.80.170.214 with HTTP; Tue, 4 Oct 2016 10:20:12 -0700 (PDT) In-Reply-To: References: From: Chakravarthy varaga Date: Tue, 4 Oct 2016 18:20:12 +0100 Message-ID: Subject: Re: Flink Checkpoint runs slow for low load stream To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11331636fb8f8c053e0d4548 archived-at: Tue, 04 Oct 2016 17:20:20 -0000 --001a11331636fb8f8c053e0d4548 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Thanks for your prompt response Stephan. I'd wait for Flink 1.1.3 !!! Best Regards Varaga On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen wrote: > The plan to release 1.1.3 is asap ;-) > > Waiting for last backported patched to get in, then release testing and > release. > > If you want to test it today, you would need to manually build the > release-1.1 branch. > > Best, > Stephan > > > On Tue, Oct 4, 2016 at 5:46 PM, Chakravarthy varaga < > chakravarthyvp@gmail.com> wrote: > >> Hi Gordon, >> >> Do I need to clone and build release-1.1 branch to test this? >> I currently use flinlk 1.1.2 runtime. When is the plan to release i= t >> in 1.1.3? >> >> Best Regards >> Varaga >> >> On Tue, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai >> wrote: >> >>> Hi, >>> >>> Helping out here: this is the PR for async Kafka offset committing - >>> https://github.com/apache/flink/pull/2574. >>> It has already been merged into the master and release-1.1 branches, so >>> you can try out the changes now if you=E2=80=99d like. >>> The change should also be included in the 1.1.3 release, which the Flin= k >>> community is discussing to release soon. >>> >>> Will definitely be helpful if you can provide feedback afterwards! >>> >>> Best Regards, >>> Gordon >>> >>> >>> On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga ( >>> chakravarthyvp@gmail.com) wrote: >>> >>> Hi Stephan, >>> >>> Is the Async kafka offset commit released in 1.3.1? >>> >>> Varaga >>> >>> On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga < >>> chakravarthyvp@gmail.com> wrote: >>> >>>> Hi Stephan, >>>> >>>> That should be great. Let me know once the fix is done and the >>>> snapshot version to use, I'll check and revert then. >>>> Can you also share the JIRA that tracks the issue? >>>> >>>> With regards to offset commit issue, I'm not sure as to how to >>>> proceed here. Probably I'll use your fix first and see if the problem >>>> reoccurs. >>>> >>>> Thanks much >>>> Varaga >>>> >>>> On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen wrote= : >>>> >>>>> @CVP >>>>> >>>>> Flink stores in checkpoints in your case only the Kafka offsets (few >>>>> bytes) and the custom state (e). >>>>> >>>>> Here is an illustration of the checkpoint and what is stored (from th= e >>>>> Flink docs). >>>>> https://ci.apache.org/projects/flink/flink-docs-master/inter >>>>> nals/stream_checkpointing.html >>>>> >>>>> >>>>> I am quite puzzled why the offset committing problem occurs only for >>>>> one input, and not for the other. >>>>> I am preparing a fix for 1.2, possibly going into 1.1.3 as well. >>>>> Could you try out a snapshot version to see if that fixes your proble= m? >>>>> >>>>> Greetings, >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga < >>>>> chakravarthyvp@gmail.com> wrote: >>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> Thanks a million for your detailed explanation. I appreciate it= . >>>>>> >>>>>> - The *zookeeper bundled with kafka 0.9.0.1* was used to start >>>>>> zookeeper. There is only 1 instance (standalone) of zookeeper runnin= g on my >>>>>> localhost (ubuntu 14.04) >>>>>> - There is only 1 Kafka broker (*version: 0.9.0.1* ) >>>>>> >>>>>> With regards to Flink cluster there's only 1 JM & 2 TMs started >>>>>> with no HA. I presume this does not use zookeeper anyways as it runs= as >>>>>> standalone cluster. >>>>>> >>>>>> >>>>>> BTW., The kafka connector version that I use is as suggested in >>>>>> the flink connectors page >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *. >>>>>> org.apache.flink >>>>>> flink-connector-kafka-0.9_2.10 >>>>>> 1.1.1 * >>>>>> >>>>>> Do you see any issues with versions? >>>>>> >>>>>> 1) Do you have benchmarks wrt., to checkpointing in flink? >>>>>> >>>>>> 2) There isn't detailed explanation on what states are stored a= s >>>>>> part of the checkpointing process. For ex., If I have pipeline like >>>>>> *source -> map -> keyBy -> map -> sink, my assumption on what's >>>>>> stored is:* >>>>>> >>>>>> * a) The source stream's custom watermarked records* >>>>>> >>>>>> * b) Intermediate states of each of the transformations in >>>>>> the pipeline* >>>>>> >>>>>> * c) Delta of Records stored from the previous sink* >>>>>> >>>>>> * d) Custom States (SayValueState as in my case) - >>>>>> Essentially this is what I bother about storing.* >>>>>> * e) All of my operators* >>>>>> >>>>>> Is my understanding right? >>>>>> >>>>>> 3) Is there a way in Flink to checkpoint only d) as stated abov= e >>>>>> >>>>>> 4) Can you apply checkpointing to only streams and certain >>>>>> operators (say I wish to store aggregated values part of the transfo= rmation) >>>>>> >>>>>> Best Regards >>>>>> CVP >>>>>> >>>>>> >>>>>> On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen >>>>>> wrote: >>>>>> >>>>>>> Thanks, the logs were very helpful! >>>>>>> >>>>>>> TL:DR - The offset committing to ZooKeeper is very slow and prevent= s >>>>>>> proper starting of checkpoints. >>>>>>> >>>>>>> Here is what is happening in detail: >>>>>>> >>>>>>> - Between the point when the TaskManager receives the "trigger >>>>>>> checkpoint" message and when the point when the KafkaSource actuall= y starts >>>>>>> the checkpoint is a long time (many seconds) - for one of the Kafka= Inputs >>>>>>> (the other is fine). >>>>>>> - The only way this delayed can be introduced is if another >>>>>>> checkpoint related operation (such as trigger() or notifyComplete()= ) is >>>>>>> still in progress when the checkpoint is started. Flink does not pe= rform >>>>>>> concurrent checkpoint operations on a single operator, to ease the >>>>>>> concurrency model for users. >>>>>>> - The operation that is still in progress must be the committing >>>>>>> of the offsets (to ZooKeeper or Kafka). That also explains why this= only >>>>>>> happens once one side receives the first record. Before that, there= is >>>>>>> nothing to commit. >>>>>>> >>>>>>> >>>>>>> What Flink should fix: >>>>>>> - The KafkaConsumer should run the commit operations >>>>>>> asynchronously, to not block the "notifyCheckpointComplete()" metho= d. >>>>>>> >>>>>>> What you can fix: >>>>>>> - Have a look at your Kafka/ZooKeeper setup. One Kafka Input work= s >>>>>>> well, the other does not. Do they go against different sets of brok= ers, or >>>>>>> different ZooKeepers? Is the metadata for one input bad? >>>>>>> - In the next Flink version, you may opt-out of committing offset= s >>>>>>> to Kafka/ZooKeeper all together. It is not important for Flink's >>>>>>> checkpoints anyways. >>>>>>> >>>>>>> Greetings, >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga < >>>>>>> chakravarthyvp@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Stefan, >>>>>>>> >>>>>>>> Please find my responses below. >>>>>>>> >>>>>>>> - What source are you using for the slow input? >>>>>>>> * [CVP] - Both stream as pointed out in my first mail, are >>>>>>>> Kafka Streams* >>>>>>>> - How large is the state that you are checkpointing? >>>>>>>> >>>>>>>> *[CVP] - I have enabled checkpointing on the StreamEnvironment as >>>>>>>> below.* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> * final StreamExecutionEnvironment streamEnv =3D >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> streamEnv.setStateBackend(new >>>>>>>> FsStateBackend("file:///tmp/flink/checkpoints")); >>>>>>>> streamEnv.enableCheckpointing(10000); * >>>>>>>> >>>>>>>> >>>>>>>> * In terms of the state stored, the KS1 stream has payload of >>>>>>>> 100K events/second, while KS2 have about 1 event / 10 minutes... b= asically >>>>>>>> the operators perform flatmaps on 8 fields of tuple (all fields ar= e >>>>>>>> primitives). If you look at the states' sizes in dashboard they ar= e in >>>>>>>> Kb... * >>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>> takes that long, or if it simply takes long for the checkpoint >>>>>>>> barriers to travel through the stream due to a lot of backpressure= ? >>>>>>>> [CVP] -There are no back pressure atleast from the sample >>>>>>>> computation in the flink dashboard. 100K/second is low load for fl= ink's >>>>>>>> benchmarks. I could not quite get the barriers vs snapshot state. = I have >>>>>>>> attached the Task Manager log (DEBUG) info if that will interest y= ou. >>>>>>>> >>>>>>>> I have attached the checkpoints times' as .png from the >>>>>>>> dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- yo= u'd >>>>>>>> see that the checkpoints take more than a minute in each case. Bef= ore these >>>>>>>> checkpoints, the KS2 stream did not have any events. As soon as an >>>>>>>> event(should be in bytes) was generated, the checkpoints went slow= and >>>>>>>> subsequently a minute more for every checkpoint thereafter. >>>>>>>> >>>>>>>> This log was collected from the standalone flink cluster with 1 >>>>>>>> job manager & 2 TMs. 1 TM was running this application with checkp= ointing >>>>>>>> (parallelism=3D1) >>>>>>>> >>>>>>>> Please let me know if you need further info., >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi! >>>>>>>>> >>>>>>>>> Let's try to figure that one out. Can you give us a bit more >>>>>>>>> information? >>>>>>>>> >>>>>>>>> - What source are you using for the slow input? >>>>>>>>> - How large is the state that you are checkpointing? >>>>>>>>> - Can you try to see in the log if actually the state snapshot >>>>>>>>> takes that long, or if it simply takes long for the checkpoint ba= rriers to >>>>>>>>> travel through the stream due to a lot of backpressure? >>>>>>>>> >>>>>>>>> Greetings, >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi CVP, >>>>>>>>>> >>>>>>>>>> I'm not so much familiar with the internals of the checkpointing >>>>>>>>>> system, but maybe Stephan (in CC) has an idea what's going on he= re. >>>>>>>>>> >>>>>>>>>> Best, Fabian >>>>>>>>>> >>>>>>>>>> 2016-09-23 11:33 GMT+02:00 Chakravarthy varaga < >>>>>>>>>> chakravarthyvp@gmail.com>: >>>>>>>>>> >>>>>>>>>>> Hi Aljoscha & Fabian, >>>>>>>>>>> >>>>>>>>>>> I have a stream application that has 2 stream source as >>>>>>>>>>> below. >>>>>>>>>>> >>>>>>>>>>> KeyedStream *ks1* =3D ds1.keyBy("*") ; >>>>>>>>>>> KeyedStream, String> *ks2* =3D >>>>>>>>>>> ds2.flatMap(split T into k-v pairs).keyBy(0); >>>>>>>>>>> >>>>>>>>>>> ks1.connect(ks2).flatMap(X); >>>>>>>>>>> //X is a CoFlatMapFunction that inserts and removes >>>>>>>>>>> elements from ks2 into a key-value state member. Elements from = ks1 are >>>>>>>>>>> matched against that state. the CoFlatMapFunction operator main= tains >>>>>>>>>>> ValueState>; >>>>>>>>>>> >>>>>>>>>>> //ks1 is streaming about 100K events/sec from kafka topic >>>>>>>>>>> //ks2 is streaming about 1 event every 10 minutes... >>>>>>>>>>> Precisely when the 1st event is consumed from this stream, chec= kpoint takes >>>>>>>>>>> 2 minutes straight away. >>>>>>>>>>> >>>>>>>>>>> The version of flink is 1.1.2. >>>>>>>>>>> >>>>>>>>>>> I tried to use checkpoint every 10 Secs using a >>>>>>>>>>> FsStateBackend... What I notice is that the checkpoint duration= is almost 2 >>>>>>>>>>> minutes for many cases, while for the other cases it varies fro= m 100 ms to >>>>>>>>>>> 1.5 minutes frequently. I'm attaching the snapshot of the dashb= oard for >>>>>>>>>>> your reference. >>>>>>>>>>> >>>>>>>>>>> Is this an issue with flink checkpointing? >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> CVP >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > --001a11331636fb8f8c053e0d4548 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thanks for your prompt response Stephan.
=C2=A0=C2=A0=C2=A0 I'd wait for Flink 1.1.3 !!!

Best Regards
Varaga

On Tue, Oct 4, 2016 at 5:36 PM, Stephan Ewen <sewen@= apache.org> wrote:
The plan to release 1.1.3 is asap ;-)

Waiting f= or last backported patched to get in, then release testing and release.

If you want to test it today, you would need to manua= lly build the release-1.1 branch.

Best,
= Stephan

<= div class=3D"gmail_extra">
On Tue, Oct 4, 201= 6 at 5:46 PM, Chakravarthy varaga <chakravarthyvp@gmail.com>= wrote:
Hi Gordon,

=C2=A0=C2=A0=C2=A0=C2=A0 Do I need to cl= one and build release-1.1 branch to test this?
=C2=A0=C2=A0=C2=A0= =C2=A0 I currently use flinlk 1.1.2 runtime. When is the plan to release it= in 1.1.3?

Best Regards
Varaga

On Tu= e, Oct 4, 2016 at 9:25 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org&= gt; wrote:
Hi,

Helping out here: this is the PR for async Kafka offset= committing - https://github.com/apache/flink/pull/2574.
It has already been merged into the= master and release-1.1 branches, so you can try out the changes now if you= =E2=80=99d like.
The cha= nge should also be included in the 1.1.3 release, which the Flink community= is discussing to release soon.

Will defini= tely be helpful if you can provide feedback afterwards!

Best Regards,
Gordon


On October 3, 2016 at 9:40:14 PM, Chakravarthy varaga (chakravarthyvp@gmail.com= ) wrote:

Hi Stephan,

=C2=A0=C2=A0=C2=A0 Is the Async kafka offset commit released in 1.3.1?

Varaga

On Wed, Sep 28, 2016 at 9:49 AM, Chakravarthy varaga <chakravarthyvp@gmail.com> wrote:=
Hi Stephan,

=C2=A0=C2=A0=C2=A0=C2=A0 That should be great. Let me know once the fix is done and the snapshot version to use, I'll check and revert then.
=C2=A0=C2=A0=C2=A0=C2=A0 Can you also share the JIRA that tracks the issue?
=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0 With regards to offset commit issue, I'm not sure as to how to proceed here. Probably I'll use your fix first and see if the problem reoccurs.

Thanks much
Varaga

On Tue, Sep 27, 2016 at 7:46 PM, Stephan Ewen <sewen@apache.org> wrote:
@CVP

Flink stores in checkpoints in your case only the Kafka offsets (few bytes) and the custom state (e).

Here is an illustration of the checkpoint and what is stored (from the Flink docs).


I am quite puzzled why the offset committing problem occurs only for one input, and not for the other.
I am preparing a fix for 1.2, possibly going into 1.1.3 as well.
Could you try out a snapshot version to see if that fixes your problem?

Greetings,
Stephan



On Tue, Sep 27, 2016 at 2:24 PM, Chakravarthy varaga <chakravarthyvp@gmail.com> wrote:=
Hi Stefan,

=C2=A0=C2=A0=C2=A0=C2=A0 Thanks a million for your detailed explanation. I appreciate it.

=C2=A0=C2=A0=C2=A0=C2=A0 -=C2=A0 The zookeeper bundled with kafka 0.9.0.1 was used to start zookeeper. There is only 1 instance (standalone) of zookeeper running on my localhost (ubuntu 14.04)
=C2=A0=C2=A0=C2=A0=C2=A0 -=C2=A0 There is only 1 Kafka broker (version: 0.9.0.1 )

=C2=A0=C2=A0=C2=A0=C2=A0 With regards to Flink cluster there's only 1 JM & 2 TMs started with no HA. I presume this does not use zookeeper anyways as it runs as standalone cluster.

=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0 BTW., The kafka connector version that I use is as suggested in the flink connectors page.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 <dependency>
=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 <groupId>org.apache.flink</groupId>
=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 <artifactId>flink-connector-kafka-0.9_2.10</artifactId= >
=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 <version>1.1.1</version>
=C2=A0=C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 </dependency>

=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0 Do you see any issues with versions?
=C2=A0=C2=A0=C2=A0
=C2=A0=C2=A0=C2=A0=C2=A0 1) Do you have benchmarks wrt., to checkpointing in flink?

=C2=A0 =C2=A0=C2=A0 2) There isn't detailed explanation on what states are stored as part of the checkpointing process. For ex.,=C2=A0 If I have pipeline like source -> map -> keyBy -> map -> sink, my assumption on what's stored is:
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 a) The source stream's custom watermarked records
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 b) Intermediate states of each of the transformations in the pipeline
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 c) Delta of Records stored from the previous sink
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 d) Custom States (SayValueState as in my case) - Essentially this is what I bother about storing.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 e) All of my operators

=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 Is my understanding right?

=C2=A0=C2=A0=C2=A0=C2=A0 3) Is there a way in Flink to checkpoint only d) as stated above

=C2=A0=C2=A0=C2=A0=C2=A0 4) Can you apply checkpointing to only streams and certain operators (say I wish to store aggregated values part of the transformation)

Best Regards
CVP


On Mon, Sep 26, 2016 at 6:18 PM, Stephan Ewen <sewen@apache.org> wrote:
Thanks, the logs were very helpful!

TL:DR - The offset committing to ZooKeeper is very slow and prevents proper starting of checkpoints.

Here is what is happening in detail:

=C2=A0 - Between the point when the TaskManager receives the "trigger checkpoint" message and when the point when the KafkaSource actually starts the checkpoint is a long time (many seconds) - for one of the Kafka Inputs (the other is fine).
=C2=A0 - The only way this delayed can be introduced is if another checkpoint related operation (such as trigger() or notifyComplete() ) is still in progress when the checkpoint is started. Flink does not perform concurrent checkpoint operations on a single operator, to ease the concurrency model for users.
=C2=A0 - The operation that is still in progress must be the committing of the offsets (to ZooKeeper or Kafka). That also explains why this only happens once one side receives the first record. Before that, there is nothing to commit.


What Flink should fix:
=C2=A0 - The KafkaConsumer should run the commit operations asynchronously, to not block the "notifyCheckpointComplete()" method.

What you can fix:
=C2=A0 - Have a look at your Kafka/ZooKeeper setup. One Kafka Input works well, the other does not. Do they go against different sets of brokers, or different ZooKeepers? Is the metadata for one input bad?
=C2=A0 - In the next Flink version, you may opt-out of committing offsets to Kafka/ZooKeeper all together. It is not important for Flink's checkpoints anyways.

Greetings,
Stephan


On Mon, Sep 26, 2016 at 5:13 PM, Chakravarthy varaga <chakravarthyvp@gmail.com> wrote:=
Hi Stefan,

=C2=A0=C2=A0=C2=A0 Please find my responses below.

=C2=A0=C2=A0=C2=A0 - What source are you using for the slow input?
=C2=A0=C2=A0=C2=A0=C2=A0 [CVP] - Both stream as pointed out in my first mail, are Kafka Streams
=C2=A0 - How large is the state that you are checkpointing?
=C2=A0=C2=A0=C2=A0=C2=A0 [CVP] - I have enabled checkpointing on the StreamEnvironment as below.
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 final StreamExecutionEnvironment streamEnv =3D StreamExecutionEnvironment.getExecutionEnvironment();
=C2=A0 =C2=A0=C2=A0 =C2=A0=C2=A0=C2=A0 streamEnv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0 streamEnv.enableCheckpointing(10000);

=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0 In terms of the state stored, the KS1 stream has payload of 100K events/second, while KS2 have about 1 event / 10 minutes... basically the operators perform flatmaps on 8 fields of tuple (all fields are primitives). If you look at the states' sizes in dashboard they are in Kb...

=C2=A0 - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure?
=C2=A0=C2=A0=C2=A0 [CVP] -There are no back pressure atleast from the sample computation in the flink dashboard. 100K/second is low load for flink's benchmarks. I could not quite get the barriers vs snapshot state. I have attached the Task Manager log (DEBUG) info if that will interest you.
=C2=A0
=C2=A0=C2=A0=C2=A0=C2= =A0 I have attached the checkpoints times' as .png from the dashboard. Basically if you look at checkpoint IDs 28 & 29 &30- you'd see that the checkpoints take more than a minute in each case. Before these checkpoints, the KS2 stream did not have any events. As soon as an event(should be in bytes) was generated, the checkpoints went slow and subsequently a minute more for every checkpoint thereafter.

=C2=A0=C2=A0 This log was collected from the standalone flink cluster with 1 job manager & 2 TMs. 1 TM was running this application with checkpointing (parallelism=3D1)
=C2=A0
=C2=A0=C2=A0=C2=A0 Ple= ase let me know if you need further info.,
=C2=A0=C2=A0=C2=A0=C2=A0

On Fri, Sep 23, 2016 at 6:26 PM, Stephan Ewen <sewen@apache.org> wrote:
Hi!

Let's try to figure that one out. Can you give us a bit more information?

=C2=A0 - What source are you using for the slow input?
=C2=A0 - How large is the state that you are checkpointing?
=C2=A0 - Can you try to see in the log if actually the state snapshot takes that long, or if it simply takes long for the checkpoint barriers to travel through the stream due to a lot of backpressure?

Greetings,
Stephan



On Fri, Sep 23, 2016 at 3:35 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi CVP,

I'm not so much familiar with the internals of the checkpointing system, but maybe Stephan (in CC) has an idea what's going on here.

Best, Fabian

2016-09-23 11:33 GMT+02:00 Chakravarthy varaga <chakravarthyvp@gmail.com>:
Hi Aljoscha & Fabian,

=C2=A0=C2=A0=C2=A0 I have a stream application that has 2 stream source as below.

=C2=A0=C2=A0=C2=A0=C2=A0 KeyedStream<String, String> ks1 =3D ds1.keyBy("*") ;
=C2=A0=C2=A0=C2=A0=C2=A0 KeyedStream<Tuple2<String, V>, String> ks2 =3D ds2.flatMap(split T into k-v pairs).keyBy(0);

=C2=A0=C2=A0=C2=A0=C2=A0 ks1.connect(ks2).flatMap(X);
=C2=A0=C2=A0=C2=A0=C2=A0 //X is a CoFlatMapFunction that inserts and removes elements from ks2 into a key-value state member. Elements from ks1 are matched against that state. the CoFlatMapFunction operator maintains ValueState<Tuple2<Long, Long>>;

=C2=A0=C2=A0=C2=A0=C2=A0 //ks1 is streaming about 100K events/sec from kafka topic
=C2=A0=C2=A0=C2=A0=C2=A0 //ks2 is streaming about 1 event every 10 minutes... Precisely when the 1st event is consumed from this stream, checkpoint takes 2 minutes straight away.

=C2=A0=C2=A0=C2=A0 The version of flink is 1.1.2.

I tried to use checkpoint every 10 Secs using a FsStateBackend... What I notice is that the checkpoint duration is almost 2 minutes for many cases, while for the other cases it varies from 100 ms to 1.5 minutes frequently. I'm attaching the snapshot of the dashboard for your reference.

=C2=A0=C2=A0=C2=A0=C2=A0 Is this an issue with flink checkpointing?

=C2=A0Best Regards
CVP











--001a11331636fb8f8c053e0d4548--