Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D2959200CBB for ; Tue, 4 Jul 2017 11:32:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D1348160BEF; Tue, 4 Jul 2017 09:32:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2C68A160BE1 for ; Tue, 4 Jul 2017 11:32:37 +0200 (CEST) Received: (qmail 8526 invoked by uid 500); 4 Jul 2017 09:32:36 -0000 Mailing-List: contact dev-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list dev@kafka.apache.org Received: (qmail 8514 invoked by uid 99); 4 Jul 2017 09:32:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Jul 2017 09:32:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 3423CC08B5 for ; Tue, 4 Jul 2017 09:32:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.379 X-Spam-Level: *** X-Spam-Status: No, score=3.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id hUkLXflRzNTr for ; Tue, 4 Jul 2017 09:32:22 +0000 (UTC) Received: from mail-wm0-f41.google.com (mail-wm0-f41.google.com [74.125.82.41]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 24CD85F39D for ; Tue, 4 Jul 2017 09:32:22 +0000 (UTC) Received: by mail-wm0-f41.google.com with SMTP id f67so78349601wmh.1 for ; Tue, 04 Jul 2017 02:32:22 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=from:mime-version:subject:date:references:to:in-reply-to:message-id; bh=sd8itfLuqtHFB47xZSLu6p2C16iMxkXfFWkOjuQVbOs=; b=URXLHyiPywThxexIsxQfAjWbkUYQ2CxTMfIuprq6Hfo+H4PYFPPxxbDWGaNjzlK4WF LwJRr1uf2CuqoOjax2IA4GddrxLLV/69WG3Ij3X3hmh7hP6NqwJsoqJb/Ca1Axioxfnb lEqo4bnItBZ/tPg/ATCqjgjZwH4Sc6aqTJ5A/SsrytbYJHlZbz5YiCG9EcBFhxjBZCg6 PnIQsD8ZhidWPs2cqgYbv704oJcoAOqdWjF2oM6Fool8PIPgczhmf3eik1yyep5rYwvw hKzFLbH3vgGvv44VHXHbsOkkHZB+G1mXgiEw3Fb4+wLpQ7hjisevEdRLujFxgudxpaKA Htjg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:mime-version:subject:date:references:to :in-reply-to:message-id; bh=sd8itfLuqtHFB47xZSLu6p2C16iMxkXfFWkOjuQVbOs=; b=BQNCohvRCuxObMF/SID/c0MvHV9cr19P9zUiKunz5LXbKW3TDD8Jd4zmhI1HnRbhnI 8nshbMNLdWUcK0BU2S2Y4wOA4xZq5f6QY2Ocyev56FHwsVVtg/a9cFCCtg2wkRxb7YFL cDleOb+Noxwt/EPvJ9fusbKXTdH2542DhmvnVxcCyycjdK3dFpzbVUoG1vFzkGWmkCOj N2Nl5LNaTRDpzaF2/wum5Aeqm2JHxwdoDDjEgph4Msyk7eYnaQINqenHCzAyhSvUw90o BndZ86SPXALa/zFffyiktz3kUXPCx7RGNb13DWiu0s+x0NkQVPcnY36Pymop8Z0rDfIR N6OQ== X-Gm-Message-State: AIVw113BW00y7F0VZAle1ODl9Hx1yJ58+60nnHDFj18BezlYwWCRjizp 7kDS5gSKGXMHhkltte4= X-Received: by 10.28.2.195 with SMTP id 186mr1603802wmc.108.1499160741439; Tue, 04 Jul 2017 02:32:21 -0700 (PDT) Received: from [192.168.0.5] (cpc91224-cmbg18-2-0-cust223.5-4.cable.virginm.net. [81.106.228.224]) by smtp.gmail.com with ESMTPSA id a3sm22569740wra.17.2017.07.04.02.32.19 for (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Tue, 04 Jul 2017 02:32:20 -0700 (PDT) From: Eno Thereska Content-Type: multipart/alternative; boundary="Apple-Mail=_BA26D2CC-9E8A-4BF1-9306-F500FB15175B" Mime-Version: 1.0 (Mac OS X Mail 10.3 \(3273\)) Subject: Re: [DISCUSS]: KIP-161: streams record processing exception handlers Date: Tue, 4 Jul 2017 10:32:19 +0100 References: <3DA51C0A-F7D8-4134-B641-06EB3F2A10D6@gmail.com> <592D2C23.5060806@trivago.com> <60071d46-bf74-f99d-26e3-fa2845907474@confluent.io> <5931372B.3020504@trivago.com> <5931510D.9030807@trivago.com> <593306A1.9050704@trivago.com> <7d454468-88f8-a6b6-72fc-217d26b311c2@confluent.io> <5D83C9DE-BC98-45BA-AF66-99C15A69D040@gmail.com> <593592AE.3050006@trivago.com> <59386D1B.9090606@trivago.com> <1E474B43-7B8C-4D84-B572-93B69BDC1B2E@gmail.com> <7e52c9db-d405-fa82-a524-0461bc7403b1@confluent.io> <47B131BE-EBF2-4890-A5C8-E1FFEC56392C@gmail.com> To: dev@kafka.apache.org In-Reply-To: <47B131BE-EBF2-4890-A5C8-E1FFEC56392C@gmail.com> Message-Id: X-Mailer: Apple Mail (2.3273) archived-at: Tue, 04 Jul 2017 09:32:39 -0000 --Apple-Mail=_BA26D2CC-9E8A-4BF1-9306-F500FB15175B Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 As part of the PR review we decided to add a metric to keep track of the = number of skipped records due to deserialization. I updated the KIP to = reflect that. Thanks Eno > On Jun 23, 2017, at 10:59 AM, Eno Thereska = wrote: >=20 > Done, thanks. I'll open a vote thread now. >=20 > Eno >> On 23 Jun 2017, at 02:15, Matthias J. Sax > wrote: >>=20 >> I also think, that one config is better, with two default >> implementations: failing and log-and-continue >>=20 >> However, I think we should fail by default. Similar to timestamp >> extractor as "silent" data loss is no good default behavior IMHO. >>=20 >>=20 >> -Matthias >>=20 >> On 6/22/17 11:00 AM, Eno Thereska wrote: >>> Answers inline:=20 >>>=20 >>>> On 22 Jun 2017, at 03:26, Guozhang Wang > wrote: >>>>=20 >>>> Thanks for the updated KIP, some more comments: >>>>=20 >>>> 1.The config name is "default.deserialization.exception.handler" = while the >>>> interface class name is "RecordExceptionHandler", which is more = general >>>> than the intended purpose. Could we rename the class name = accordingly? >>>=20 >>> Sure. >>>=20 >>>=20 >>>>=20 >>>> 2. Could you describe the full implementation of = "DefaultExceptionHandler", >>>> currently it is not clear to me how it is implemented with the = configured >>>> value. >>>>=20 >>>> In addition, I think we do not need to include an additional >>>> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the = configure() >>>> function is mainly used for users to pass any customized parameters = that is >>>> out of the Streams library; plus adding such additional config = sounds >>>> over-complicated for a default exception handler. Instead I'd = suggest we >>>> just provide two handlers (or three if people feel strong about the >>>> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler = and one >>>> for LogAndContinueOnExceptionHandler. And we can set >>>> LogAndContinueOnExceptionHandler >>>> by default. >>>>=20 >>>=20 >>> That's what I had originally. Jay mentioned he preferred one default = class, with config options. >>> So with that approach, you'd have 2 config options, one for failing, = one for continuing, and the one >>> exception handler would take those options during it's configure() = call. >>>=20 >>> After checking the other exception handlers in the code, I might = revert back to what I originally had (2 default handlers)=20 >>> as Guozhang also re-suggests, but still have the interface extend = Configurable. Guozhang, you ok with that? In that case >>> there is no need for the response config option. >>>=20 >>> Thanks >>> Eno >>>=20 >>>=20 >>>>=20 >>>> Guozhang >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>>=20 >>>> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska = = >> >>>> wrote: >>>>=20 >>>>> Thanks Guozhang, >>>>>=20 >>>>> I=E2=80=99ve updated the KIP and hopefully addressed all the = comments so far. In >>>>> the process also changed the name of the KIP to reflect its scope = better: >>>>> = https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ = > >>>>> deserialization+exception+handlers > >>>>> confluence/display/KAFKA/KIP-161:+streams+deserialization+ >>>>> exception+handlers> >>>>>=20 >>>>> Any other feedback appreciated, otherwise I=E2=80=99ll start the = vote soon. >>>>>=20 >>>>> Thanks >>>>> Eno >>>>>=20 >>>>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang > wrote: >>>>>>=20 >>>>>> Eno, Thanks for bringing this proposal up and sorry for getting = late on >>>>>> this. Here are my two cents: >>>>>>=20 >>>>>> 1. First some meta comments regarding "fail fast" v.s. "making >>>>> progress". I >>>>>> agree that in general we should better "enforce user to do the = right >>>>> thing" >>>>>> in system design, but we also need to keep in mind that Kafka is = a >>>>>> multi-tenant system, i.e. from a Streams app's pov you probably = would not >>>>>> control the whole streaming processing pipeline end-to-end. E.g. = Your >>>>> input >>>>>> data may not be controlled by yourself; it could be written by = another >>>>> app, >>>>>> or another team in your company, or even a different = organization, and if >>>>>> an error happens maybe you cannot fix "to do the right thing" = just by >>>>>> yourself in time. In such an environment I think it is important = to leave >>>>>> the door open to let users be more resilient. So I find the = current >>>>>> proposal which does leave the door open for either fail-fast or = make >>>>>> progress quite reasonable. >>>>>>=20 >>>>>> 2. On the other hand, if the question is whether we should = provide a >>>>>> built-in "send to bad queue" handler from the library, I think = that might >>>>>> be an overkill: with some tweaks (see my detailed comments below) = on the >>>>>> API we can allow users to implement such handlers pretty easily. = In >>>>> fact, I >>>>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a >>>>> built-in >>>>>> handler, as it would then require users to specify the threshold = via >>>>>> configs, etc. I think letting people provide such "eco-libraries" = may be >>>>>> better. >>>>>>=20 >>>>>> 3. Regarding the CRC error: today we validate CRC on both the = broker end >>>>>> upon receiving produce requests and on consumer end upon = receiving fetch >>>>>> responses; and if the CRC validation fails in the former case it = would >>>>> not >>>>>> be appended to the broker logs. So if we do see a CRC failure on = the >>>>>> consumer side it has to be that either we have a flipped bit on = the >>>>> broker >>>>>> disks or over the wire. For the first case it is fatal while for = the >>>>> second >>>>>> it is retriable. Unfortunately we cannot tell which case it is = when >>>>> seeing >>>>>> CRC validation failures. But in either case, just skipping and = making >>>>>> progress seems not a good choice here, and hence I would = personally >>>>> exclude >>>>>> these errors from the general serde errors to NOT leave the door = open of >>>>>> making progress. >>>>>>=20 >>>>>> Currently such errors are thrown as KafkaException that wraps an >>>>>> InvalidRecordException, which may be too general and we could = consider >>>>> just >>>>>> throwing the InvalidRecordException directly. But that could be = an >>>>>> orthogonal discussion if we agrees that CRC failures should not = be >>>>>> considered in this KIP. >>>>>>=20 >>>>>> ---------------- >>>>>>=20 >>>>>> Now some detailed comments: >>>>>>=20 >>>>>> 4. Could we consider adding the processor context in the handle() >>>>> function >>>>>> as well? This context will be wrapping as the source node that is = about >>>>> to >>>>>> process the record. This could expose more info like which task / = source >>>>>> node sees this error, which timestamp of the message, etc, and = also can >>>>>> allow users to implement their handlers by exposing some metrics, = by >>>>>> calling context.forward() to implement the "send to bad queue" = behavior >>>>> etc. >>>>>>=20 >>>>>> 5. Could you add the string name of >>>>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the = KIP? >>>>>> Personally I find "default" prefix a bit misleading since we do = not allow >>>>>> users to override it per-node yet. But I'm okay either way as I = can see >>>>> we >>>>>> may extend it in the future and probably would like to not rename = the >>>>>> config again. Also from the experience of `default partitioner` = and >>>>>> `default timestamp extractor` we may also make sure that the = passed in >>>>>> object can be either a string "class name" or a class object? >>>>>>=20 >>>>>>=20 >>>>>> Guozhang >>>>>>=20 >>>>>>=20 >>>>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak = > >>>>>> wrote: >>>>>>=20 >>>>>>> Hi Eno, >>>>>>>=20 >>>>>>> On 07.06.2017 22:49, Eno Thereska wrote: >>>>>>>=20 >>>>>>>> Comments inline: >>>>>>>>=20 >>>>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak > >>>>> wrote: >>>>>>>>>=20 >>>>>>>>> Hi >>>>>>>>>=20 >>>>>>>>> just my few thoughts >>>>>>>>>=20 >>>>>>>>> On 05.06.2017 11:44, Eno Thereska wrote: >>>>>>>>>=20 >>>>>>>>>> Hi there, >>>>>>>>>>=20 >>>>>>>>>> Sorry for the late reply, I was out this past week. Looks = like good >>>>>>>>>> progress was made with the discussions either way. Let me = recap a >>>>> couple of >>>>>>>>>> points I saw into one big reply: >>>>>>>>>>=20 >>>>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As = these >>>>>>>>>> happen in Kafka, before Kafka Streams gets a chance to = inspect >>>>> anything, >>>>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael = or >>>>> Jason on >>>>>>>>>> this one. Currently the documentation is not great with what = to do >>>>> once a >>>>>>>>>> CRC check has failed. =46rom looking at the code, it looks = like the >>>>> client >>>>>>>>>> gets a KafkaException (bubbled up from the fetcher) and = currently we >>>>> in >>>>>>>>>> streams catch this as part of poll() and fail. It might be >>>>> advantageous to >>>>>>>>>> treat CRC handling in a similar way to serialisation handling = (e.g., >>>>> have >>>>>>>>>> the option to fail/skip). Let's see what the other folks say. >>>>> Worst-case we >>>>>>>>>> can do a separate KIP for that if it proved too hard to do in = one go. >>>>>>>>>>=20 >>>>>>>>> there is no reasonable way to "skip" a crc error. How can you = know the >>>>>>>>> length you read was anything reasonable? you might be = completely lost >>>>>>>>> inside your response. >>>>>>>>>=20 >>>>>>>> On the client side, every record received is checked for = validity. As >>>>> it >>>>>>>> happens, if the CRC check fails the exception is wrapped with a >>>>>>>> KafkaException that is thrown all the way to poll(). Assuming = we change >>>>>>>> that and poll() throws a CRC exception, I was thinking we could = treat >>>>> it >>>>>>>> similarly to a deserialize exception and pass it to the = exception >>>>> handler >>>>>>>> to decide what to do. Default would be to fail. This might need = a >>>>> Kafka KIP >>>>>>>> btw and can be done separately from this KIP, but Jan, would = you find >>>>> this >>>>>>>> useful? >>>>>>>>=20 >>>>>>> I don't think so. IMO you can not reasonably continue parsing = when the >>>>>>> checksum of a message is not correct. If you are not sure you = got the >>>>>>> correct length, how can you be sure to find the next record? I = would >>>>> always >>>>>>> straight fail in all cases. Its to hard for me to understand why = one >>>>> would >>>>>>> try to continue. I mentioned CRC's because thats the only bad = pills I >>>>> ever >>>>>>> saw so far. But I am happy that it just stopped and I could = check what >>>>> was >>>>>>> going on. This will also be invasive in the client code then. >>>>>>>=20 >>>>>>> If you ask me, I am always going to vote for "grind to halt" let = the >>>>>>> developers see what happened and let them fix it. It helps = building good >>>>>>> kafka experiences and better software and architectures. For me = this is: >>>>>>> "force the user todo the right thing". = https://youtu.be/aAb7hSCtvGw? >>>>> t=3D374 >>>>>>> eg. not letting unexpected input slip by. Letting unexpected = input >>>>> slip by >>>>>>> is what bought us 15+years of war of all sorts of ingestion = attacks. I >>>>>>> don't even dare to estimate how many missingrecords-search-teams = going >>>>> be >>>>>>> formed, maybe some hackerone for stream apps :D >>>>>>>=20 >>>>>>> Best Jan >>>>>>>=20 >>>>>>>=20 >>>>>>>>=20 >>>>>>>>>> At a minimum, handling this type of exception will need to = involve >>>>> the >>>>>>>>>> exactly-once (EoS) logic. We'd still allow the option of = failing or >>>>>>>>>> skipping, but EoS would need to clean up by rolling back all = the side >>>>>>>>>> effects from the processing so far. Matthias, how does this = sound? >>>>>>>>>>=20 >>>>>>>>> Eos will not help the record might be 5,6 repartitions down = into the >>>>>>>>> topology. I haven't followed but I pray you made EoS optional! = We >>>>> don't >>>>>>>>> need this and we don't want this and we will turn it off if it = comes. >>>>> So I >>>>>>>>> wouldn't recommend relying on it. The option to turn it off is = better >>>>> than >>>>>>>>> forcing it and still beeing unable to rollback badpills (as = explained >>>>>>>>> before) >>>>>>>>>=20 >>>>>>>> Yeah as Matthias mentioned EoS is optional. >>>>>>>>=20 >>>>>>>> Thanks, >>>>>>>> Eno >>>>>>>>=20 >>>>>>>>=20 >>>>>>>> 6. Will add an end-to-end example as Michael suggested. >>>>>>>>>>=20 >>>>>>>>>> Thanks >>>>>>>>>> Eno >>>>>>>>>>=20 >>>>>>>>>>=20 >>>>>>>>>>=20 >>>>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax = > >>>>> wrote: >>>>>>>>>>>=20 >>>>>>>>>>> What I don't understand is this: >>>>>>>>>>>=20 >>>>>>>>>>> =46rom there on its the easiest way forward: fix, redeploy, = start =3D> >>>>>>>>>>>> done >>>>>>>>>>>>=20 >>>>>>>>>>> If you have many producers that work fine and a new "bad" = producer >>>>>>>>>>> starts up and writes bad data into your input topic, your = Streams >>>>> app >>>>>>>>>>> dies but all your producers, including the bad one, keep = writing. >>>>>>>>>>>=20 >>>>>>>>>>> Thus, how would you fix this, as you cannot "remove" the = corrupted >>>>> date >>>>>>>>>>> from the topic? It might take some time to identify the root = cause >>>>> and >>>>>>>>>>> stop the bad producer. Up to this point you get good and bad = data >>>>> into >>>>>>>>>>> your Streams input topic. If Streams app in not able to skip = over >>>>> those >>>>>>>>>>> bad records, how would you get all the good data from the = topic? Not >>>>>>>>>>> saying it's not possible, but it's extra work copying the = data with >>>>> a >>>>>>>>>>> new non-Streams consumer-producer-app into a new topic and = than feed >>>>>>>>>>> your Streams app from this new topic -- you also need to = update all >>>>>>>>>>> your >>>>>>>>>>> upstream producers to write to the new topic. >>>>>>>>>>>=20 >>>>>>>>>>> Thus, if you want to fail fast, you can still do this. And = after you >>>>>>>>>>> detected and fixed the bad producer you might just = reconfigure your >>>>> app >>>>>>>>>>> to skip bad records until it reaches the good part of the = data. >>>>>>>>>>> Afterwards, you could redeploy with fail-fast again. >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> Thus, for this pattern, I actually don't see any reason why = to stop >>>>> the >>>>>>>>>>> Streams app at all. If you have a callback, and use the = callback to >>>>>>>>>>> raise an alert (and maybe get the bad data into a bad record >>>>> queue), it >>>>>>>>>>> will not take longer to identify and stop the "bad" = producer. But >>>>> for >>>>>>>>>>> this case, you have zero downtime for your Streams app. >>>>>>>>>>>=20 >>>>>>>>>>> This seems to be much simpler. Or do I miss anything? >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> Having said this, I agree that the "threshold based = callback" might >>>>> be >>>>>>>>>>> questionable. But as you argue for strict "fail-fast", I = want to >>>>> argue >>>>>>>>>>> that this must not always be the best pattern to apply and = that the >>>>>>>>>>> overall KIP idea is super useful from my point of view. >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> -Matthias >>>>>>>>>>>=20 >>>>>>>>>>>=20 >>>>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote: >>>>>>>>>>>=20 >>>>>>>>>>>> Could not agree more! >>>>>>>>>>>>=20 >>>>>>>>>>>> But then I think the easiest is still: print exception and = die. >>>>>>>>>>>> =46rom there on its the easiest way forward: fix, redeploy, = start =3D> >>>>>>>>>>>> done >>>>>>>>>>>>=20 >>>>>>>>>>>> All the other ways to recover a pipeline that was = processing >>>>> partially >>>>>>>>>>>> all the time >>>>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold = is not >>>>>>>>>>>> straight forward IMO. >>>>>>>>>>>>=20 >>>>>>>>>>>> How to find the offset, when it became to bad when it is = not the >>>>>>>>>>>> latest >>>>>>>>>>>> commited one? >>>>>>>>>>>> How to reset there? with some reasonable stuff in your = rockses? >>>>>>>>>>>>=20 >>>>>>>>>>>> If one would do the following. The continuing Handler would = measure >>>>>>>>>>>> for >>>>>>>>>>>> a threshold and >>>>>>>>>>>> would terminate after a certain threshold has passed (per = task). >>>>> Then >>>>>>>>>>>> one can use offset commit/ flush intervals >>>>>>>>>>>> to make reasonable assumption of how much is slipping by + = you get >>>>> an >>>>>>>>>>>> easy recovery when it gets to bad >>>>>>>>>>>> + you could also account for "in processing" records. >>>>>>>>>>>>=20 >>>>>>>>>>>> Setting this threshold to zero would cover all cases with 1 >>>>>>>>>>>> implementation. It is still beneficial to have it pluggable >>>>>>>>>>>>=20 >>>>>>>>>>>> Again CRC-Errors are the only bad pills we saw in = production for >>>>> now. >>>>>>>>>>>>=20 >>>>>>>>>>>> Best Jan >>>>>>>>>>>>=20 >>>>>>>>>>>>=20 >>>>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote: >>>>>>>>>>>>=20 >>>>>>>>>>>>> Jan, I agree with you philosophically. I think one = practical >>>>>>>>>>>>> challenge >>>>>>>>>>>>> has >>>>>>>>>>>>> to do with data formats. Many people use untyped events, = so there >>>>> is >>>>>>>>>>>>> simply >>>>>>>>>>>>> no guarantee on the form of the input. E.g. many companies = use >>>>> JSON >>>>>>>>>>>>> without >>>>>>>>>>>>> any kind of schema so it becomes very hard to assert = anything >>>>> about >>>>>>>>>>>>> the >>>>>>>>>>>>> input which makes these programs very fragile to the "one >>>>> accidental >>>>>>>>>>>>> message publication that creates an unsolvable problem. >>>>>>>>>>>>>=20 >>>>>>>>>>>>> For that reason I do wonder if limiting to just = serialization >>>>>>>>>>>>> actually >>>>>>>>>>>>> gets >>>>>>>>>>>>> you a useful solution. For JSON it will help with the = problem of >>>>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the = case >>>>> where >>>>>>>>>>>>> the >>>>>>>>>>>>> JSON is well-formed but does not have any of the fields = you expect >>>>>>>>>>>>> and >>>>>>>>>>>>> depend on for your processing. I expect the reason for = limiting >>>>> the >>>>>>>>>>>>> scope >>>>>>>>>>>>> is it is pretty hard to reason about correctness for = anything that >>>>>>>>>>>>> stops in >>>>>>>>>>>>> the middle of processing an operator DAG? >>>>>>>>>>>>>=20 >>>>>>>>>>>>> -Jay >>>>>>>>>>>>>=20 >>>>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak < >>>>>>>>>>>>> Jan.Filipiak@trivago.com = > >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>=20 >>>>>>>>>>>>> IMHO your doing it wrong then. + building to much support = into the >>>>>>>>>>>>>> kafka >>>>>>>>>>>>>> eco system is very counterproductive in fostering a happy >>>>> userbase >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote: >>>>>>>>>>>>>>=20 >>>>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is = about >>>>>>>>>>>>>>> giving >>>>>>>>>>>>>>> people options and there are times when you don't want = to fail >>>>>>>>>>>>>>> fast. >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak < >>>>> Jan.Filipiak@trivago.com >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>> Hi >>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> 1. >>>>>>>>>>>>>>>> That greatly complicates monitoring. Fail Fast gives = you that >>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>> monitor only the lag of all your apps >>>>>>>>>>>>>>>> you are completely covered. With that sort of new = application >>>>>>>>>>>>>>>> Monitoring >>>>>>>>>>>>>>>> is very much more complicated as >>>>>>>>>>>>>>>> you know need to monitor fail % of some special apps = aswell. >>>>> In my >>>>>>>>>>>>>>>> opinion that is a huge downside already. >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> 2. >>>>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not = even be >>>>> the >>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>> that is broken, it might be just your app >>>>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you = got >>>>>>>>>>>>>>>> partitioned >>>>>>>>>>>>>>>> away from that registry. >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> 3. When you get alerted because of to high fail = percentage. >>>>> what >>>>>>>>>>>>>>>> are the >>>>>>>>>>>>>>>> steps you gonna do? >>>>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to = much >>>>> time >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> find a good reprocess offset. >>>>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you = pretty much >>>>>>>>>>>>>>>> lost. >>>>>>>>>>>>>>>> This routine is nonsense. >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> Dead letter queues would be the worst possible addition = to the >>>>>>>>>>>>>>>> kafka >>>>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the >>>>> architecture >>>>>>>>>>>>>>>> of having clients falling behind is a valid option. >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen = so far >>>>> is >>>>>>>>>>>>>>>> crc >>>>>>>>>>>>>>>> errors. any plans for those? >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote: >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. = There >>>>> are >>>>>>>>>>>>>>>>> plenty >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt = to >>>>> make >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> progress. >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> The dead-letter queue is exactly for these = circumstances. Of >>>>>>>>>>>>>>>>> course if >>>>>>>>>>>>>>>>> every record is failing, then you probably do want to = give up. >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax < >>>>>>>>>>>>>>>>> matthias@confluent.io > >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>> First a meta comment. KIP discussion should take place = on the >>>>> dev >>>>>>>>>>>>>>>>> list >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to = both >>>>>>>>>>>>>>>>>> lists. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a = lot of >>>>>>>>>>>>>>>>> sense to >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> focus on deserialization exceptions for now. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make = sense to >>>>>>>>>>>>>>>>>> fail a >>>>>>>>>>>>>>>>>> task and wipe out the store to repair it via = recreation from >>>>> the >>>>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, = but I >>>>> want >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> bring >>>>>>>>>>>>>>>>>> it up to design the first step in a way such that we = can get >>>>>>>>>>>>>>>>>> there (if >>>>>>>>>>>>>>>>>> we think it's a reasonable idea). >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> I also want to comment about fail fast vs making = progress. I >>>>>>>>>>>>>>>>>> think that >>>>>>>>>>>>>>>>>> fail-fast must not always be the best option. The = scenario I >>>>>>>>>>>>>>>>>> have in >>>>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that = feed the >>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one = producer >>>>>>>>>>>>>>>>>> miss >>>>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You = might not >>>>> even >>>>>>>>>>>>>>>>>> be able >>>>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there = is no >>>>>>>>>>>>>>>>>> reason to >>>>>>>>>>>>>>>>>> stop processing but you just skip over those records. = Of >>>>>>>>>>>>>>>>>> course, you >>>>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to = alert >>>>> (either >>>>>>>>>>>>>>>>>> via logs >>>>>>>>>>>>>>>>>> of the exception handler directly) and you need to = start to >>>>>>>>>>>>>>>>>> investigate >>>>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Here the dead letter queue comes into place. =46rom = my >>>>>>>>>>>>>>>>>> understanding, the >>>>>>>>>>>>>>>>>> purpose of this feature is solely enable post = debugging. I >>>>> don't >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> those record would be fed back at any point in time = (so I >>>>> don't >>>>>>>>>>>>>>>>>> see any >>>>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, = is just >>>>>>>>>>>>>>>>>> "fully >>>>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should = actually >>>>> encode >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> original records metadata (topic, partition offset = etc) to >>>>>>>>>>>>>>>>>> enable >>>>>>>>>>>>>>>>>> such >>>>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if = you just >>>>> log >>>>>>>>>>>>>>>>>> the bad >>>>>>>>>>>>>>>>>> records, but it would be harder to access (you first = must >>>>> find >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> Streams instance that did write the log and extract = the >>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> there). Reading it from topic is much simpler. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> I also want to mention the following. Assume you have = such a >>>>>>>>>>>>>>>>>> topic with >>>>>>>>>>>>>>>>>> some bad records and some good records. If we always >>>>> fail-fast, >>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>> going to be super hard to process the good data. You = would >>>>> need >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>> an extra app that copied the data into a new topic = filtering >>>>>>>>>>>>>>>>>> out the >>>>>>>>>>>>>>>>>> bad >>>>>>>>>>>>>>>>>> records (or apply the map() workaround withing = stream). So I >>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> that failing fast is most likely the best option in >>>>> production >>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> necessarily, true. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you = can >>>>> recover >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> corrupted records successfully? And even if this is >>>>> possible, it >>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the = whole >>>>>>>>>>>>>>>>>> application? >>>>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted = record, >>>>> should >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would = solve >>>>> the >>>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>> problem. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote: >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much = appreciated! >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding = an >>>>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the >>>>> Processor >>>>>>>>>>>>>>>>>>> API, >>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> user would write a simple application that would then = be >>>>>>>>>>>>>>>>>>> augmented >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions. It should = also >>>>>>>>>>>>>>>>>>> become much >>>>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to = different code >>>>>>>>>>>>>>>>>>> paths for >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> happy case and any failure scenarios. >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> - Do we have sufficient information available to make >>>>> informed >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> what to do next? For example, do we know in which = part of >>>>> the >>>>>>>>>>>>>>>>>>> topology >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to = topic, >>>>>>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about = topology-related >>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> (e.g. >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> what is the associated state store, if any)? >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, = but this >>>>> is >>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option = to send >>>>>>>>>>>>>>>>>>> corrupted >>>>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic). = But, what >>>>>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, = e.g. >>>>> how to >>>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> retries with backoff ("If the first record in the = dead letter >>>>>>>>>>>>>>>>>>> queue >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> fails >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> again, then try the second record for the time being = and go >>>>> back >>>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> first record at a later time"). Jay and Jan already = alluded >>>>> to >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> ordering >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As = I said, >>>>>>>>>>>>>>>>> retries >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> might be out of scope but perhaps the implications = should be >>>>>>>>>>>>>>>>>>> considered >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> possible? >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the = point in >>>>> the >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> conversation >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions = in the >>>>>>>>>>>>>>>>>>> category of >>>>>>>>>>>>>>>>>>> poison pills / deserialization errors. But since = Jay >>>>> brought >>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> errors again, I decided to include it again. >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> = ----------------------------snip-------------------------- >>>>> -- >>>>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split = between the >>>>>>>>>>>>>>>>>>> code for >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the = failure >>>>>>>>>>>>>>>>>>> path >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> (using >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> exception handlers). In Scala, for example, we can = do: >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> scala> val computation =3D scala.util.Try(1 / 0) >>>>>>>>>>>>>>>>>>> computation: scala.util.Try[Int] =3D >>>>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> scala> computation.getOrElse(42) >>>>>>>>>>>>>>>>>>> res2: Int =3D 42 >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which = is >>>>>>>>>>>>>>>>>>> similar to >>>>>>>>>>>>>>>>>>> `KStream#branch()`: >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> computation match { >>>>>>>>>>>>>>>>>>> case scala.util.Success(x) =3D> x * 5 >>>>>>>>>>>>>>>>>>> case scala.util.Failure(_) =3D> 42 >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle = this in >>>>>>>>>>>>>>>>>>> Scala, >>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> that's >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> not the point I'm trying to make here.) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want = to have >>>>> an >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>> where >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> code "the happy path", and then have a different code = path >>>>> for >>>>>>>>>>>>>>>>>>> failures >>>>>>>>>>>>>>>>>>> (using exceptions and handlers); or should we treat = both >>>>>>>>>>>>>>>>>>> Success and >>>>>>>>>>>>>>>>>>> Failure in the same way? >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as >>>>> proposed in >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> KIP) >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> is well-suited for errors in the category of = deserialization >>>>>>>>>>>>>>>>> problems >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> aka >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are = defined >>>>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> configuration (explicit serdes however are defined = through >>>>> API >>>>>>>>>>>>>>>>>>> calls). >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> However, I'm not yet convinced that the = failure/exception >>>>>>>>>>>>>>>>>>> handling >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if = you fail >>>>> to >>>>>>>>>>>>>>>>>>> guard >>>>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by = zero. >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> scala> val stream =3D Seq(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>>>> stream: Seq[Int] =3D List(1, 2, 3, 4, 5) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> // Here: Fallback to a sane default when = encountering >>>>>>>>>>>>>>>>>>> failed >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> scala> stream.map(x =3D> Try(1/(3 - = x))).flatMap(t =3D> >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Seq(t.getOrElse(42))) >>>>>>>>>>>>>>>>>>> res19: Seq[Int] =3D List(0, 1, 42, -1, 0) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> // Here: Skip over failed records >>>>>>>>>>>>>>>>>>> scala> stream.map(x =3D> Try(1/(3 - = x))).collect{ case >>>>>>>>>>>>>>>>>>> Success(s) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> =3D> s >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> res20: Seq[Int] =3D List(0, 1, -1, 0) >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> The above is more natural to me than using error = handlers to >>>>>>>>>>>>>>>>>>> define >>>>>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes = an >>>>>>>>>>>>>>>>>>> arithmetic >>>>>>>>>>>>>>>>>>> exception). Again, it might help the KIP if we = added an >>>>>>>>>>>>>>>>>>> end-to-end >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> example >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> for such user code errors. >>>>>>>>>>>>>>>>>>> = ----------------------------snip-------------------------- >>>>> -- >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak < >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com = > >>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Hi Jay, >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to = only >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> ConsumerRecord >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> deserialisation. >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would = really >>>>> not >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>> a dead letter queue or something >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in = order. >>>>> Just >>>>>>>>>>>>>>>>>>>> grind >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it >>>>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering = dead >>>>>>>>>>>>>>>>>>>> letters. >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> (where >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> reprocessing might be even the faster fix) >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote: >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> - I think we should hold off on retries unless = we >>>>> have >>>>>>>>>>>>>>>>>>>> worked >>>>>>>>>>>>>>>>>>>> out >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> full usage pattern, people can always implement = their >>>>>>>>>>>>>>>>>>> own. I >>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> the idea >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> is that you send the message to some kind of dead >>>>>>>>>>>>>>>>>>>>> letter queue >>>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> replay these later. This obviously destroys all >>>>> semantic >>>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>> guarantees >>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>> we are >>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>> working hard to provide right now, which may be = okay. >>>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>>=20 >>>>>>>>>>>>>>>>>>>>>=20 >>>>>>>=20 >>>>>>=20 >>>>>>=20 >>>>>> -- >>>>>> -- Guozhang >>>>>=20 >>>>>=20 >>>>=20 >>>>=20 >>>> --=20 >>>> -- Guozhang >=20 --Apple-Mail=_BA26D2CC-9E8A-4BF1-9306-F500FB15175B--