Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90BB0200C16 for ; Thu, 9 Feb 2017 20:43:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F316160B50; Thu, 9 Feb 2017 19:43:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5F232160B4B for ; Thu, 9 Feb 2017 20:43:56 +0100 (CET) Received: (qmail 67125 invoked by uid 500); 9 Feb 2017 19:43:55 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 67113 invoked by uid 99); 9 Feb 2017 19:43:55 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 19:43:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 9D9EBC05D7 for ; Thu, 9 Feb 2017 19:43:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.38 X-Spam-Level: *** X-Spam-Status: No, score=3.38 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id WLVT2VTSxcTO for ; Thu, 9 Feb 2017 19:43:50 +0000 (UTC) Received: from mail-ot0-f172.google.com (mail-ot0-f172.google.com [74.125.82.172]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C3E2A5FC61 for ; Thu, 9 Feb 2017 19:43:49 +0000 (UTC) Received: by mail-ot0-f172.google.com with SMTP id 32so11397669oth.3 for ; Thu, 09 Feb 2017 11:43:49 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=afB2KsNSz4FAlj8SDUzxjnY4y+o+2FWgahDFC/MjdJg=; b=NDEAGtFR7T6bL4beKBB6rU2Zu7joyBze4mwGrxJucIunwEqIAN8GOXdK9CpYgMrjEL 5vILmkuRWMqUEHhuA92jc8esWe8NmIi8Jeg637KOdPYnyBCRe8Wf1E7qSObbaJov4eP2 lLfSdYHyizecrCp5VMNWF0QhW1Bl0GywIEd2ttVnWDXzkVkjXIyDMSrBeDmeD/dcil1b /JVkueh4oZvrNR+2CfZJDy5GrkfWLdi9CL53hrZJuq/YJSeSVWA6XLkBWLcJtnA238Up cd747BbpnGq+mutgCbkBWKmp74MWCZL0pSe2YnA4y4PYaga1zRJLdEHnYYW7LKu8ceCq YbmQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=afB2KsNSz4FAlj8SDUzxjnY4y+o+2FWgahDFC/MjdJg=; b=In68IpqvqxRb3PIFRcTUayaw/1a+6rUJKX5OLc4JnNeg9eQjd8Zuow9I2BihOcDlF7 n3rd1c5khInwiyJgbaxYjCNIcy23OZn9aNXhd7a9UlgxP7Y51hgzfFE6swVPKKSi6xo0 gAGDtTBTTMiOuRMM1uwynoJdfy374Bb6+C12LSJgL8nGiRhFMEcNvFDJtxgw2oiT2X4r s5yfGK+KvfWuuCMbNIz92QCK2dnoh0D4kW6SRLGcXcNwf2yXTtY07zj39pE1ATTV7l/6 v5sEus1G4TzoYuTHVclVSNB4vFKoMNj5z7uK6amOmhyLnLpZYYdznjNfNG63sYLl0rAS YYrw== X-Gm-Message-State: AMke39ndcutZLHBIZjQCJhoaqn1FEa21kT51mFPjNXWmQiik8JqBfOQkYD+QIc3Q59zXYxufCNoPbhlTNVRb3A== X-Received: by 10.157.37.203 with SMTP id q69mr2617786ota.182.1486669365279; Thu, 09 Feb 2017 11:42:45 -0800 (PST) MIME-Version: 1.0 Received: by 10.157.52.150 with HTTP; Thu, 9 Feb 2017 11:42:44 -0800 (PST) In-Reply-To: References: <2022E1ED-A77E-4D58-AD98-B45B18B45703@gmail.com> <2571b749-1a80-e4ed-35f0-863d6a41e02c@confluent.io> From: Guozhang Wang Date: Thu, 9 Feb 2017 11:42:44 -0800 Message-ID: Subject: Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration To: "dev@kafka.apache.org" Content-Type: multipart/alternative; boundary=001a113d75ae6a839105481e2fbc archived-at: Thu, 09 Feb 2017 19:43:57 -0000 --001a113d75ae6a839105481e2fbc Content-Type: text/plain; charset=UTF-8 More specifically, here is my reasoning of failure cases, and would like to get your feedbacks: *StreamTask* For stream-task, the committing order is 1) flush state (may send more records to changelog in producer), 2) flush producer, 3) commit upstream offsets. My understanding is that the writing of the checkpoint file will between 2) and 3). So thatt he new order will be 1) flush state, 2) flush producer, 3) write checkpoint file (when necessary), 4) commit upstream offsets. And we have a bunch of "changelog offsets" regarding the state: a) offset corresponding to the image of the persistent file, name it point A, b) log end offset, name it offset B, c) checkpoint file recorded offset, name it offset C, d) offset corresponding to the current committed upstream offset, name it offset D. Now let's talk about the failure cases: If there is a crash between 1) and 2), then A > B = C = D. In this case, if we restore, we will replay no logs at all since B = C while the persistent state file is actually "ahead of time", and we will start reprocessing since from the input offset corresponding to D = B < A and hence have some duplicated, *which will be incorrect* if the update logic involve reading the state store values as well (i.e. not a blind write), e.g. aggregations. If there is a crash between 2) and 3), then A = B > C = D. When we restore, we will replay from C -> B = A, and then start reprocessing from input offset corresponding to D < A, and same issue applies as above. If there is a crash between 3) and 4), then A = B = C > D. When we restore, we will not replay, and then start reprocessing from input offset corresponding to D < A, and same issue applies as above. *StandbyTask* We only do one operation today, which is 1) flush state, I think we will add the writing of the checkpoint file after it as step 2). Failure cases again: offset A -> correspond to the image of the file, offset B -> changelog end offset, offset C -> written as in the checkpoint file. If there is a crash between 1) and 2), then B >= A > C (B >= A because we are reading from changelog topic so A will never be greater than B), 1) and if this task resumes as a standby task, we will resume restoration from offset C, and a few duplicates from C -> A will be applied again to local state files, then continue from A -> B, *this is OK* since they do not incur any computations hence no side effects and are all idempotent. 2) and if this task resumes as a stream task, we will replay changelogs from C -> A, with duplicated updates, and then from A -> B. This is also OK for the same reason as above. So it seems to me that this is not safe for a StreamTask, or maybe the writing of the checkpoint file in your mind is different? Guozhang On Thu, Feb 9, 2017 at 11:02 AM, Guozhang Wang wrote: > A quick question re: `We will add the above config parameter to > *StreamsConfig*. During *StreamTask#commit()*, *StandbyTask#commit()*, > and *GlobalUpdateStateTask#flushState()* we will check if the checkpoint > interval has elapsed and write the checkpoint file.` > > Will the writing of the checkpoint file happen before the flushing of the > state manager? > > Guozhang > > > On Thu, Feb 9, 2017 at 10:48 AM, Matthias J. Sax > wrote: > >> But 5 min means, that we (in the worst case) need to reply data from the >> last 5 minutes to get the store ready. >> >> So why not go with the min possible value of 30 seconds to speed up this >> process if the impact is negligible anyway? >> >> What do you gain by being conservative? >> >> >> -Matthias >> >> On 2/9/17 2:54 AM, Damian Guy wrote: >> > Why shouldn't it be 5 minutes? ;-) >> > It is a finger in the air number. Based on the testing i did it shows >> that >> > there isn't much, if any, overhead when checkpointing a single store on >> the >> > commit interval. The default commit interval is 30 seconds, so it could >> > possibly be set to that. However, i'd prefer to be a little >> conservative so >> > 5 minutes seemed reasonable. >> > >> > >> > On Thu, 9 Feb 2017 at 10:25 Michael Noll wrote: >> > >> >> Damian, >> >> >> >> could you elaborate briefly why the default value should be 5 minutes? >> >> What are the considerations, assumptions, etc. that go into picking >> this >> >> value? >> >> >> >> Right now, in the KIP and in this discussion, "5 mins" looks like a >> magic >> >> number to me. :-) >> >> >> >> -Michael >> >> >> >> >> >> >> >> On Thu, Feb 9, 2017 at 11:03 AM, Damian Guy >> wrote: >> >> >> >>> I've ran the SimpleBenchmark with checkpoint on and off to see what >> the >> >>> impact is. It appears that there is very little impact, if any. The >> >> numbers >> >>> with checkpointing on actually look better, but that is likely largely >> >> due >> >>> to external influences. >> >>> >> >>> In any case, i'm going to suggest we go with a default checkpoint >> >> interval >> >>> of 5 minutes. I've update the KIP with this. >> >>> >> >>> commit every 10 seconds (no checkpoint) >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/34798/287372.83751939767/29.570664980746017 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/35942/278226.0308274442/28.62945857214401 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/34677/288375.58035585546/29.673847218617528 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/34677/288375.58035585546/29.673847218617528 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/31192/320595.02436522185/32.98922800718133 >> >>> >> >>> >> >>> checkpoint every 10 seconds (same as commit interval) >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/36997/270292.185852907/27.81306592426413 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/32087/311652.69423754164/32.069062237043035 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/32895/303997.5680194558/31.281349749202004 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/33476/298721.4720994145/30.738439479029754 >> >>> Streams Performance [records/latency/rec-sec/MB-sec source+store]: >> >>> 10000000/33196/301241.1133871551/30.99771056753826 >> >>> >> >>> On Wed, 8 Feb 2017 at 09:02 Damian Guy wrote: >> >>> >> >>>> Matthias, >> >>>> >> >>>> Fair point. I'll update it the KIP. >> >>>> Thanks >> >>>> >> >>>> On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax >> >>> wrote: >> >>>> >> >>>> Damian, >> >>>> >> >>>> I am not strict about it either. However, if there is no advantage in >> >>>> disabling it, we might not want to allow it. This would have the >> >>>> advantage to guard users to accidentally switch it off. >> >>>> >> >>>> -Matthias >> >>>> >> >>>> >> >>>> On 2/3/17 2:03 AM, Damian Guy wrote: >> >>>>> Hi Matthias, >> >>>>> >> >>>>> It possibly doesn't make sense to disable it, but then i'm sure >> >> someone >> >>>>> will come up with a reason they don't want it! >> >>>>> I'm happy to change it such that the checkpoint interval must be > >> 0. >> >>>>> >> >>>>> Cheers, >> >>>>> Damian >> >>>>> >> >>>>> On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax >> >>>> wrote: >> >>>>> >> >>>>>> Thanks Damian. >> >>>>>> >> >>>>>> One more question: "Checkpointing is disabled if the checkpoint >> >>> interval >> >>>>>> is set to a value <=0." >> >>>>>> >> >>>>>> >> >>>>>> Does it make sense to disable check pointing? What's the tradeoff >> >>> here? >> >>>>>> >> >>>>>> >> >>>>>> -Matthias >> >>>>>> >> >>>>>> >> >>>>>> On 2/2/17 1:51 AM, Damian Guy wrote: >> >>>>>>> Hi Matthias, >> >>>>>>> >> >>>>>>> Thanks for the comments. >> >>>>>>> >> >>>>>>> 1. TBD - i need to do some performance tests and try and work out >> a >> >>>>>>> sensible default. >> >>>>>>> 2. Yes, you are correct. It could be a multiple of the >> >>>>>> commit.interval.ms. >> >>>>>>> But, that would also mean if you change the commit interval - say >> >> you >> >>>>>> lower >> >>>>>>> it, then you might also need to change the checkpoint setting >> (i.e, >> >>> you >> >>>>>>> still only want to checkpoint every n minutes). >> >>>>>>> >> >>>>>>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax < >> matthias@confluent.io >> >>> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>>> Thanks for the KIP Damian. >> >>>>>>>> >> >>>>>>>> I am wondering about two things: >> >>>>>>>> >> >>>>>>>> 1. what should be the default value for the new parameter? >> >>>>>>>> 2. why is the new parameter provided in ms? >> >>>>>>>> >> >>>>>>>> About (2): because >> >>>>>>>> >> >>>>>>>> "the minimum checkpoint interval will be the value of >> >>>>>>>> commit.interval.ms. In effect the actual checkpoint interval >> will >> >>> be >> >>>> a >> >>>>>>>> multiple of the commit interval" >> >>>>>>>> >> >>>>>>>> it might be easier to just use an parameter that is >> >>> "number-or-commit >> >>>>>>>> intervals". >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> -Matthias >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On 2/1/17 7:29 AM, Damian Guy wrote: >> >>>>>>>>> Thanks for the comments Eno. >> >>>>>>>>> As for exactly once, i don't believe this matters as we are just >> >>>>>>>> restoring >> >>>>>>>>> the change-log, i.e, the result of the aggregations that >> >> previously >> >>>> ran >> >>>>>>>>> etc. So once initialized the state store will be in the same >> >> state >> >>> as >> >>>>>> it >> >>>>>>>>> was before. >> >>>>>>>>> Having the checkpoint in a kafka topic is not ideal as the state >> >> is >> >>>> per >> >>>>>>>>> kafka streams instance. So each instance would need to start >> >> with a >> >>>>>>>> unique >> >>>>>>>>> id that is persistent. >> >>>>>>>>> >> >>>>>>>>> Cheers, >> >>>>>>>>> Damian >> >>>>>>>>> >> >>>>>>>>> On Wed, 1 Feb 2017 at 13:20 Eno Thereska < >> eno.thereska@gmail.com >> >>> >> >>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> As a follow up to my previous comment, have you thought about >> >>>> writing >> >>>>>>>> the >> >>>>>>>>>> checkpoint to a topic instead of a local file? That would have >> >> the >> >>>>>>>>>> advantage that all metadata continues to be managed by Kafka, >> as >> >>>> well >> >>>>>> as >> >>>>>>>>>> fit with EoS. The potential disadvantage would be a slower >> >>> latency, >> >>>>>>>> however >> >>>>>>>>>> if it is periodic as you mention, I'm not sure that would be a >> >>> show >> >>>>>>>> stopper. >> >>>>>>>>>> >> >>>>>>>>>> Thanks >> >>>>>>>>>> Eno >> >>>>>>>>>>> On 1 Feb 2017, at 12:58, Eno Thereska > > >> >>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks Damian, this is a good idea and will reduce the restore >> >>>> time. >> >>>>>>>>>> Looking forward, with exactly once and support for transactions >> >> in >> >>>>>>>> Kafka, I >> >>>>>>>>>> believe we'll have to add some support for rolling back >> >>> checkpoints, >> >>>>>>>> e.g., >> >>>>>>>>>> when a transaction is aborted. We need to be aware of that and >> >>>> ideally >> >>>>>>>>>> anticipate a bit those needs in the KIP. >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks >> >>>>>>>>>>> Eno >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>>> On 1 Feb 2017, at 10:18, Damian Guy >> >>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> Hi all, >> >>>>>>>>>>>> >> >>>>>>>>>>>> I would like to start the discussion on KIP-116: >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>> >> >>>>>> >> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> >>> 116+-+Add+State+Store+Checkpoint+Interval+Configuration >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks, >> >>>>>>>>>>>> Damian >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> >> >>>>> >> >>>> >> >>>> >> >>> >> >> >> > >> >> > > > -- > -- Guozhang > -- -- Guozhang --001a113d75ae6a839105481e2fbc--