Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C6A70200C16 for ; Thu, 9 Feb 2017 22:03:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C4FE4160B50; Thu, 9 Feb 2017 21:03:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BB06A160B4B for ; Thu, 9 Feb 2017 22:03:38 +0100 (CET) Received: (qmail 89459 invoked by uid 500); 9 Feb 2017 21:03:37 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 89447 invoked by uid 99); 9 Feb 2017 21:03:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 21:03:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DC6301A7A3D for ; Thu, 9 Feb 2017 21:03:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.399 X-Spam-Level: X-Spam-Status: No, score=0.399 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_MSPIKE_H2=-0.001, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UlnqfkyIxUsi for ; Thu, 9 Feb 2017 21:03:34 +0000 (UTC) Received: from mail-wr0-f173.google.com (mail-wr0-f173.google.com [209.85.128.173]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 829E35F473 for ; Thu, 9 Feb 2017 21:03:33 +0000 (UTC) Received: by mail-wr0-f173.google.com with SMTP id k90so92658031wrc.3 for ; Thu, 09 Feb 2017 13:03:33 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:content-transfer-encoding:mime-version:subject:date:references :to:in-reply-to:message-id; bh=UMIfg7U4teOpqqcYBjOVQMMZHo960+wyuSwwab6U/sg=; b=p6T8D0MwSTuW/ejiX4iTv5CbscGlHQJt5ozdpTEaipT1YkA1b3aC0QqtaVkCk6SQcO M/ZAVgCZWmgUIiR+GYro6EnTXsiUhd0SorZ4yMBDMu36vO55bkBY2ICKEQElyBoOE7ob 9bB+OtN15lsw5b6Z7nbrjLJx4f9I6cOqucNFctZ2n/UlB52qNChKpr6S2/xEKhAUviPR pEsiJ4FsX9NFW5J4wW7ZppWztN6HSZVQZ8vNW687VdEas+P0x3xyRNLzjZn9in6OcLIR okspAn76r2s88W6oE7d5JNm4ZpIHZR82Qh/UEFdYmSU9uHBfgFFJ64XcZd3jEllRf3nP lQtw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:content-transfer-encoding:mime-version :subject:date:references:to:in-reply-to:message-id; bh=UMIfg7U4teOpqqcYBjOVQMMZHo960+wyuSwwab6U/sg=; b=UyshyfQEpyXVFwwzF95gFZO/nKRYgim04QFHKQC7sydESrPYvJ8F5JsGjtbA8S1zzU s62ASE4x1FvRR1Wne+G+CrFpsqq0uDcaSVqq9qLu95Ukt6u5lUGP4ni6Q/q1qVRnkrAh pzViR9vSZn00j2cm3HkoQj2aGQz3oudiKABG3Jc7LlxrWWjy1z6mILEZz4BPKHNmLvoi VRfEIRhjN6vJP2ZzL/lnUMkP3DNFOVXTRrhLpE7eX4eBJQTBKAtg6ZzQh+wh2aWoF6ko En8H+jybqceHgoTeD/p62hVRlhTQ9UYrPZCSzai5nk/fKrZK8J9slTSyTHAMOTHiZvVU IZXA== X-Gm-Message-State: AMke39lWtemdSYneo29cXrc6ifkdyMIIAdKzxKt8bVmOKbu4kO+wE3ZUA5EBdkEdQ2NOOw== X-Received: by 10.223.139.93 with SMTP id v29mr5003809wra.70.1486674209890; Thu, 09 Feb 2017 13:03:29 -0800 (PST) Received: from [192.168.0.5] (cpc91224-cmbg18-2-0-cust223.5-4.cable.virginm.net. [81.106.228.224]) by smtp.gmail.com with ESMTPSA id 8sm4217031wmg.1.2017.02.09.13.03.28 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 09 Feb 2017 13:03:29 -0800 (PST) From: Eno Thereska Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: quoted-printable Mime-Version: 1.0 (Mac OS X Mail 10.2 \(3259\)) Subject: Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration Date: Thu, 9 Feb 2017 21:03:28 +0000 References: <2022E1ED-A77E-4D58-AD98-B45B18B45703@gmail.com> <2571b749-1a80-e4ed-35f0-863d6a41e02c@confluent.io> To: dev@kafka.apache.org In-Reply-To: Message-Id: <80DABD55-C578-4BC4-A3B0-1D604E385BC9@gmail.com> X-Mailer: Apple Mail (2.3259) archived-at: Thu, 09 Feb 2017 21:03:40 -0000 Hi Guozhang, It seems to me we have the same semantics today. Are you saying there is = a new failure scenario?=20 Thanks, Eno > On 9 Feb 2017, at 19:42, Guozhang Wang wrote: >=20 > More specifically, here is my reasoning of failure cases, and would = like to > get your feedbacks: >=20 > *StreamTask* >=20 > For stream-task, the committing order is 1) flush state (may send more > records to changelog in producer), 2) flush producer, 3) commit = upstream > offsets. My understanding is that the writing of the checkpoint file = will > between 2) and 3). So thatt he new order will be 1) flush state, 2) = flush > producer, 3) write checkpoint file (when necessary), 4) commit = upstream > offsets. >=20 > And we have a bunch of "changelog offsets" regarding the state: a) = offset > corresponding to the image of the persistent file, name it point A, b) = log > end offset, name it offset B, c) checkpoint file recorded offset, name = it > offset C, d) offset corresponding to the current committed upstream = offset, > name it offset D. >=20 > Now let's talk about the failure cases: >=20 > If there is a crash between 1) and 2), then A > B =3D C =3D D. In this = case, if > we restore, we will replay no logs at all since B =3D C while the = persistent > state file is actually "ahead of time", and we will start reprocessing > since from the input offset corresponding to D =3D B < A and hence = have some > duplicated, *which will be incorrect* if the update logic involve = reading > the state store values as well (i.e. not a blind write), e.g. = aggregations. >=20 > If there is a crash between 2) and 3), then A =3D B > C =3D D. When we = restore, > we will replay from C -> B =3D A, and then start reprocessing from = input > offset corresponding to D < A, and same issue applies as above. >=20 > If there is a crash between 3) and 4), then A =3D B =3D C > D. When we = restore, > we will not replay, and then start reprocessing from input offset > corresponding to D < A, and same issue applies as above. >=20 >=20 > *StandbyTask* >=20 > We only do one operation today, which is 1) flush state, I think we = will > add the writing of the checkpoint file after it as step 2). >=20 > Failure cases again: offset A -> correspond to the image of the file, > offset B -> changelog end offset, offset C -> written as in the = checkpoint > file. >=20 > If there is a crash between 1) and 2), then B >=3D A > C (B >=3D A = because we > are reading from changelog topic so A will never be greater than B), >=20 > 1) and if this task resumes as a standby task, we will resume = restoration > from offset C, and a few duplicates from C -> A will be applied again = to > local state files, then continue from A -> B, *this is OK* since they = do > not incur any computations hence no side effects and are all = idempotent. >=20 > 2) and if this task resumes as a stream task, we will replay = changelogs > from C -> A, with duplicated updates, and then from A -> B. This is = also OK > for the same reason as above. >=20 >=20 >=20 > So it seems to me that this is not safe for a StreamTask, or maybe the > writing of the checkpoint file in your mind is different? >=20 >=20 > Guozhang >=20 >=20 >=20 > On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang = wrote: >=20 >> A quick question re: `We will add the above config parameter to >> *StreamsConfig*. During *StreamTask#commit()*, = *StandbyTask#commit()*, >> and *GlobalUpdateStateTask#flushState()* we will check if the = checkpoint >> interval has elapsed and write the checkpoint file.` >>=20 >> Will the writing of the checkpoint file happen before the flushing of = the >> state manager? >>=20 >> Guozhang >>=20 >>=20 >> On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax = >> wrote: >>=20 >>> But 5 min means, that we (in the worst case) need to reply data from = the >>> last 5 minutes to get the store ready. >>>=20 >>> So why not go with the min possible value of 30 seconds to speed up = this >>> process if the impact is negligible anyway? >>>=20 >>> What do you gain by being conservative? >>>=20 >>>=20 >>> -Matthias >>>=20 >>> On 2/9/17 2:54 AM, Damian Guy wrote: >>>> Why shouldn't it be 5 minutes? ;-) >>>> It is a finger in the air number. Based on the testing i did it = shows >>> that >>>> there isn't much, if any, overhead when checkpointing a single = store on >>> the >>>> commit interval. The default commit interval is 30 seconds, so it = could >>>> possibly be set to that. However, i'd prefer to be a little >>> conservative so >>>> 5 minutes seemed reasonable. >>>>=20 >>>>=20 >>>> On Thu, 9 Feb 2017 at 10:25 Michael Noll = wrote: >>>>=20 >>>>> Damian, >>>>>=20 >>>>> could you elaborate briefly why the default value should be 5 = minutes? >>>>> What are the considerations, assumptions, etc. that go into = picking >>> this >>>>> value? >>>>>=20 >>>>> Right now, in the KIP and in this discussion, "5 mins" looks like = a >>> magic >>>>> number to me. :-) >>>>>=20 >>>>> -Michael >>>>>=20 >>>>>=20 >>>>>=20 >>>>> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy >>> wrote: >>>>>=20 >>>>>> I've ran the SimpleBenchmark with checkpoint on and off to see = what >>> the >>>>>> impact is. It appears that there is very little impact, if any. = The >>>>> numbers >>>>>> with checkpointing on actually look better, but that is likely = largely >>>>> due >>>>>> to external influences. >>>>>>=20 >>>>>> In any case, i'm going to suggest we go with a default checkpoint >>>>> interval >>>>>> of 5 minutes. I've update the KIP with this. >>>>>>=20 >>>>>> commit every 10 seconds (no checkpoint) >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/34798/287372.83751939767/29.570664980746017 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/35942/278226.0308274442/28.62945857214401 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/34677/288375.58035585546/29.673847218617528 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/34677/288375.58035585546/29.673847218617528 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/31192/320595.02436522185/32.98922800718133 >>>>>>=20 >>>>>>=20 >>>>>> checkpoint every 10 seconds (same as commit interval) >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/36997/270292.185852907/27.81306592426413 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/32087/311652.69423754164/32.069062237043035 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/32895/303997.5680194558/31.281349749202004 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/33476/298721.4720994145/30.738439479029754 >>>>>> Streams Performance [records/latency/rec-sec/MB-sec = source+store]: >>>>>> 10000000/33196/301241.1133871551/30.99771056753826 >>>>>>=20 >>>>>> On Wed, 8 Feb 2017 at 09:02 Damian Guy = wrote: >>>>>>=20 >>>>>>> Matthias, >>>>>>>=20 >>>>>>> Fair point. I'll update it the KIP. >>>>>>> Thanks >>>>>>>=20 >>>>>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax = >>>>>> wrote: >>>>>>>=20 >>>>>>> Damian, >>>>>>>=20 >>>>>>> I am not strict about it either. However, if there is no = advantage in >>>>>>> disabling it, we might not want to allow it. This would have the >>>>>>> advantage to guard users to accidentally switch it off. >>>>>>>=20 >>>>>>> -Matthias >>>>>>>=20 >>>>>>>=20 >>>>>>> On 2/3/17 2:03 AM, Damian Guy wrote: >>>>>>>> Hi Matthias, >>>>>>>>=20 >>>>>>>> It possibly doesn't make sense to disable it, but then i'm sure >>>>> someone >>>>>>>> will come up with a reason they don't want it! >>>>>>>> I'm happy to change it such that the checkpoint interval must = be > >>> 0. >>>>>>>>=20 >>>>>>>> Cheers, >>>>>>>> Damian >>>>>>>>=20 >>>>>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax = >>>>>>> wrote: >>>>>>>>=20 >>>>>>>>> Thanks Damian. >>>>>>>>>=20 >>>>>>>>> One more question: "Checkpointing is disabled if the = checkpoint >>>>>> interval >>>>>>>>> is set to a value <=3D0." >>>>>>>>>=20 >>>>>>>>>=20 >>>>>>>>> Does it make sense to disable check pointing? What's the = tradeoff >>>>>> here? >>>>>>>>>=20 >>>>>>>>>=20 >>>>>>>>> -Matthias >>>>>>>>>=20 >>>>>>>>>=20 >>>>>>>>> On 2/2/17 1:51 AM, Damian Guy wrote: >>>>>>>>>> Hi Matthias, >>>>>>>>>>=20 >>>>>>>>>> Thanks for the comments. >>>>>>>>>>=20 >>>>>>>>>> 1. TBD - i need to do some performance tests and try and work = out >>> a >>>>>>>>>> sensible default. >>>>>>>>>> 2. Yes, you are correct. It could be a multiple of the >>>>>>>>> commit.interval.ms. >>>>>>>>>> But, that would also mean if you change the commit interval - = say >>>>> you >>>>>>>>> lower >>>>>>>>>> it, then you might also need to change the checkpoint setting >>> (i.e, >>>>>> you >>>>>>>>>> still only want to checkpoint every n minutes). >>>>>>>>>>=20 >>>>>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax < >>> matthias@confluent.io >>>>>>=20 >>>>>>>>> wrote: >>>>>>>>>>=20 >>>>>>>>>>> Thanks for the KIP Damian. >>>>>>>>>>>=20 >>>>>>>>>>> I am wondering about two things: >>>>>>>>>>>=20 >>>>>>>>>>> 1. what should be the default value for the new parameter? >>>>>>>>>>> 2. why is the new parameter provided in ms? >>>>>>>>>>>=20 >>>>>>>>>>> About (2): because >>>>>>>>>>>=20 >>>>>>>>>>> "the minimum checkpoint interval will be the value of >>>>>>>>>>> commit.interval.ms. In effect the actual checkpoint interval >>> will >>>>>> be >>>>>>> a >>>>>>>>>>> multiple of the commit interval" >>>>>>>>>>>=20 >>>>>>>>>>> it might be easier to just use an parameter that is >>>>>> "number-or-commit >>>>>>>>>>> intervals". >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> -Matthias >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote: >>>>>>>>>>>> Thanks for the comments Eno. >>>>>>>>>>>> As for exactly once, i don't believe this matters as we are = just >>>>>>>>>>> restoring >>>>>>>>>>>> the change-log, i.e, the result of the aggregations that >>>>> previously >>>>>>> ran >>>>>>>>>>>> etc. So once initialized the state store will be in the = same >>>>> state >>>>>> as >>>>>>>>> it >>>>>>>>>>>> was before. >>>>>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the = state >>>>> is >>>>>>> per >>>>>>>>>>>> kafka streams instance. So each instance would need to = start >>>>> with a >>>>>>>>>>> unique >>>>>>>>>>>> id that is persistent. >>>>>>>>>>>>=20 >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Damian >>>>>>>>>>>>=20 >>>>>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska < >>> eno.thereska@gmail.com >>>>>>=20 >>>>>>>>> wrote: >>>>>>>>>>>>=20 >>>>>>>>>>>>> As a follow up to my previous comment, have you thought = about >>>>>>> writing >>>>>>>>>>> the >>>>>>>>>>>>> checkpoint to a topic instead of a local file? That would = have >>>>> the >>>>>>>>>>>>> advantage that all metadata continues to be managed by = Kafka, >>> as >>>>>>> well >>>>>>>>> as >>>>>>>>>>>>> fit with EoS. The potential disadvantage would be a slower >>>>>> latency, >>>>>>>>>>> however >>>>>>>>>>>>> if it is periodic as you mention, I'm not sure that would = be a >>>>>> show >>>>>>>>>>> stopper. >>>>>>>>>>>>>=20 >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Eno >>>>>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska = >>>=20 >>>>>>>>> wrote: >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the = restore >>>>>>> time. >>>>>>>>>>>>> Looking forward, with exactly once and support for = transactions >>>>> in >>>>>>>>>>> Kafka, I >>>>>>>>>>>>> believe we'll have to add some support for rolling back >>>>>> checkpoints, >>>>>>>>>>> e.g., >>>>>>>>>>>>> when a transaction is aborted. We need to be aware of that = and >>>>>>> ideally >>>>>>>>>>>>> anticipate a bit those needs in the KIP. >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy = >>>>>> wrote: >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> I would like to start the discussion on KIP-116: >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>=20 >>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>=20 >>>>>>>>>>>>>=20 >>>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>=20 >>>>>>>>>=20 >>>>>>>>>=20 >>>>>>>>=20 >>>>>>>=20 >>>>>>>=20 >>>>>>=20 >>>>>=20 >>>>=20 >>>=20 >>>=20 >>=20 >>=20 >> -- >> -- Guozhang >>=20 >=20 >=20 >=20 > --=20 > -- Guozhang