kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Date Thu, 22 Jun 2017 18:00:29 GMT
Answers inline: 

> On 22 Jun 2017, at 03:26, Guozhang Wang <wangguoz@gmail.com> wrote:
> 
> Thanks for the updated KIP, some more comments:
> 
> 1.The config name is "default.deserialization.exception.handler" while the
> interface class name is "RecordExceptionHandler", which is more general
> than the intended purpose. Could we rename the class name accordingly?

Sure.


> 
> 2. Could you describe the full implementation of "DefaultExceptionHandler",
> currently it is not clear to me how it is implemented with the configured
> value.
> 
> In addition, I think we do not need to include an additional
> "DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure()
> function is mainly used for users to pass any customized parameters that is
> out of the Streams library; plus adding such additional config sounds
> over-complicated for a default exception handler. Instead I'd suggest we
> just provide two handlers (or three if people feel strong about the
> LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one
> for LogAndContinueOnExceptionHandler. And we can set
> LogAndContinueOnExceptionHandler
> by default.
> 

That's what I had originally. Jay mentioned he preferred one default class, with config options.
So with that approach, you'd have 2 config options, one for failing, one for continuing, and the one
exception handler would take those options during it's configure() call.

After checking the other exception handlers in the code, I might revert back to what I originally had (2 default handlers) 
as Guozhang also re-suggests, but still have the interface extend Configurable. Guozhang, you ok with that? In that case
there is no need for the response config option.

Thanks
Eno


> 
> Guozhang
> 
> 
> 
> 
> 
> 
> 
> 
> On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.thereska@gmail.com <mailto:eno.thereska@gmail.com>>
> wrote:
> 
>> Thanks Guozhang,
>> 
>> I’ve updated the KIP and hopefully addressed all the comments so far. In
>> the process also changed the name of the KIP to reflect its scope better:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+ <https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+>
>> deserialization+exception+handlers <https://cwiki.apache.org/ <https://cwiki.apache.org/>
>> confluence/display/KAFKA/KIP-161:+streams+deserialization+
>> exception+handlers>
>> 
>> Any other feedback appreciated, otherwise I’ll start the vote soon.
>> 
>> Thanks
>> Eno
>> 
>>> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangguoz@gmail.com> wrote:
>>> 
>>> Eno, Thanks for bringing this proposal up and sorry for getting late on
>>> this. Here are my two cents:
>>> 
>>> 1. First some meta comments regarding "fail fast" v.s. "making
>> progress". I
>>> agree that in general we should better "enforce user to do the right
>> thing"
>>> in system design, but we also need to keep in mind that Kafka is a
>>> multi-tenant system, i.e. from a Streams app's pov you probably would not
>>> control the whole streaming processing pipeline end-to-end. E.g. Your
>> input
>>> data may not be controlled by yourself; it could be written by another
>> app,
>>> or another team in your company, or even a different organization, and if
>>> an error happens maybe you cannot fix "to do the right thing" just by
>>> yourself in time. In such an environment I think it is important to leave
>>> the door open to let users be more resilient. So I find the current
>>> proposal which does leave the door open for either fail-fast or make
>>> progress quite reasonable.
>>> 
>>> 2. On the other hand, if the question is whether we should provide a
>>> built-in "send to bad queue" handler from the library, I think that might
>>> be an overkill: with some tweaks (see my detailed comments below) on the
>>> API we can allow users to implement such handlers pretty easily. In
>> fact, I
>>> feel even "LogAndThresholdExceptionHandler" is not necessary as a
>> built-in
>>> handler, as it would then require users to specify the threshold via
>>> configs, etc. I think letting people provide such "eco-libraries" may be
>>> better.
>>> 
>>> 3. Regarding the CRC error: today we validate CRC on both the broker end
>>> upon receiving produce requests and on consumer end upon receiving fetch
>>> responses; and if the CRC validation fails in the former case it would
>> not
>>> be appended to the broker logs. So if we do see a CRC failure on the
>>> consumer side it has to be that either we have a flipped bit on the
>> broker
>>> disks or over the wire. For the first case it is fatal while for the
>> second
>>> it is retriable. Unfortunately we cannot tell which case it is when
>> seeing
>>> CRC validation failures. But in either case, just skipping and making
>>> progress seems not a good choice here, and hence I would personally
>> exclude
>>> these errors from the general serde errors to NOT leave the door open of
>>> making progress.
>>> 
>>> Currently such errors are thrown as KafkaException that wraps an
>>> InvalidRecordException, which may be too general and we could consider
>> just
>>> throwing the InvalidRecordException directly. But that could be an
>>> orthogonal discussion if we agrees that CRC failures should not be
>>> considered in this KIP.
>>> 
>>> ----------------
>>> 
>>> Now some detailed comments:
>>> 
>>> 4. Could we consider adding the processor context in the handle()
>> function
>>> as well? This context will be wrapping as the source node that is about
>> to
>>> process the record. This could expose more info like which task / source
>>> node sees this error, which timestamp of the message, etc, and also can
>>> allow users to implement their handlers by exposing some metrics, by
>>> calling context.forward() to implement the "send to bad queue" behavior
>> etc.
>>> 
>>> 5. Could you add the string name of
>>> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP?
>>> Personally I find "default" prefix a bit misleading since we do not allow
>>> users to override it per-node yet. But I'm okay either way as I can see
>> we
>>> may extend it in the future and probably would like to not rename the
>>> config again. Also from the experience of `default partitioner` and
>>> `default timestamp extractor` we may also make sure that the passed in
>>> object can be either a string "class name" or a class object?
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <Jan.Filipiak@trivago.com>
>>> wrote:
>>> 
>>>> Hi Eno,
>>>> 
>>>> On 07.06.2017 22:49, Eno Thereska wrote:
>>>> 
>>>>> Comments inline:
>>>>> 
>>>>> On 5 Jun 2017, at 18:19, Jan Filipiak <Jan.Filipiak@trivago.com>
>> wrote:
>>>>>> 
>>>>>> Hi
>>>>>> 
>>>>>> just my few thoughts
>>>>>> 
>>>>>> On 05.06.2017 11:44, Eno Thereska wrote:
>>>>>> 
>>>>>>> Hi there,
>>>>>>> 
>>>>>>> Sorry for the late reply, I was out this past week. Looks like good
>>>>>>> progress was made with the discussions either way. Let me recap a
>> couple of
>>>>>>> points I saw into one big reply:
>>>>>>> 
>>>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these
>>>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect
>> anything,
>>>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or
>> Jason on
>>>>>>> this one. Currently the documentation is not great with what to do
>> once a
>>>>>>> CRC check has failed. From looking at the code, it looks like the
>> client
>>>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we
>> in
>>>>>>> streams catch this as part of poll() and fail. It might be
>> advantageous to
>>>>>>> treat CRC handling in a similar way to serialisation handling (e.g.,
>> have
>>>>>>> the option to fail/skip). Let's see what the other folks say.
>> Worst-case we
>>>>>>> can do a separate KIP for that if it proved too hard to do in one go.
>>>>>>> 
>>>>>> there is no reasonable way to "skip" a crc error. How can you know the
>>>>>> length you read was anything reasonable? you might be completely lost
>>>>>> inside your response.
>>>>>> 
>>>>> On the client side, every record received is checked for validity. As
>> it
>>>>> happens, if the CRC check fails the exception is wrapped with a
>>>>> KafkaException that is thrown all the way to poll(). Assuming we change
>>>>> that and poll() throws a CRC exception, I was thinking we could treat
>> it
>>>>> similarly to a deserialize exception and pass it to the exception
>> handler
>>>>> to decide what to do. Default would be to fail. This might need a
>> Kafka KIP
>>>>> btw and can be done separately from this KIP, but Jan, would you find
>> this
>>>>> useful?
>>>>> 
>>>> I don't think so. IMO you can not reasonably continue parsing when the
>>>> checksum of a message is not correct. If you are not sure you got the
>>>> correct length, how can you be sure to find the next record? I would
>> always
>>>> straight fail in all cases. Its to hard for me to understand why one
>> would
>>>> try to continue. I mentioned CRC's because thats the only bad pills I
>> ever
>>>> saw so far. But I am happy that it just stopped and I could check what
>> was
>>>> going on. This will also be invasive in the client code then.
>>>> 
>>>> If you ask me, I am always going to vote for "grind to halt" let the
>>>> developers see what happened and let them fix it. It helps building good
>>>> kafka experiences and better software and architectures. For me this is:
>>>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?
>> t=374
>>>> eg. not letting unexpected input slip by.  Letting unexpected input
>> slip by
>>>> is what bought us 15+years of war of all sorts of ingestion attacks. I
>>>> don't even dare to estimate how many missingrecords-search-teams going
>> be
>>>> formed, maybe some hackerone for stream apps :D
>>>> 
>>>> Best Jan
>>>> 
>>>> 
>>>>> 
>>>>>>> At a minimum, handling this type of exception will need to involve
>> the
>>>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or
>>>>>>> skipping, but EoS would need to clean up by rolling back all the side
>>>>>>> effects from the processing so far. Matthias, how does this sound?
>>>>>>> 
>>>>>> Eos will not help the record might be 5,6 repartitions down into the
>>>>>> topology. I haven't followed but I pray you made EoS optional! We
>> don't
>>>>>> need this and we don't want this and we will turn it off if it comes.
>> So I
>>>>>> wouldn't recommend relying on it. The option to turn it off is better
>> than
>>>>>> forcing it and still beeing unable to rollback badpills (as explained
>>>>>> before)
>>>>>> 
>>>>> Yeah as Matthias mentioned EoS is optional.
>>>>> 
>>>>> Thanks,
>>>>> Eno
>>>>> 
>>>>> 
>>>>> 6. Will add an end-to-end example as Michael suggested.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matthias@confluent.io>
>> wrote:
>>>>>>>> 
>>>>>>>> What I don't understand is this:
>>>>>>>> 
>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>> done
>>>>>>>>> 
>>>>>>>> If you have many producers that work fine and a new "bad" producer
>>>>>>>> starts up and writes bad data into your input topic, your Streams
>> app
>>>>>>>> dies but all your producers, including the bad one, keep writing.
>>>>>>>> 
>>>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted
>> date
>>>>>>>> from the topic? It might take some time to identify the root cause
>> and
>>>>>>>> stop the bad producer. Up to this point you get good and bad data
>> into
>>>>>>>> your Streams input topic. If Streams app in not able to skip over
>> those
>>>>>>>> bad records, how would you get all the good data from the topic? Not
>>>>>>>> saying it's not possible, but it's extra work copying the data with
>> a
>>>>>>>> new non-Streams consumer-producer-app into a new topic and than feed
>>>>>>>> your Streams app from this new topic -- you also need to update all
>>>>>>>> your
>>>>>>>> upstream producers to write to the new topic.
>>>>>>>> 
>>>>>>>> Thus, if you want to fail fast, you can still do this. And after you
>>>>>>>> detected and fixed the bad producer you might just reconfigure your
>> app
>>>>>>>> to skip bad records until it reaches the good part of the data.
>>>>>>>> Afterwards, you could redeploy with fail-fast again.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thus, for this pattern, I actually don't see any reason why to stop
>> the
>>>>>>>> Streams app at all. If you have a callback, and use the callback to
>>>>>>>> raise an alert (and maybe get the bad data into a bad record
>> queue), it
>>>>>>>> will not take longer to identify and stop the "bad" producer. But
>> for
>>>>>>>> this case, you have zero downtime for your Streams app.
>>>>>>>> 
>>>>>>>> This seems to be much simpler. Or do I miss anything?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Having said this, I agree that the "threshold based callback" might
>> be
>>>>>>>> questionable. But as you argue for strict "fail-fast", I want to
>> argue
>>>>>>>> that this must not always be the best pattern to apply and that the
>>>>>>>> overall KIP idea is super useful from my point of view.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>>>>>> 
>>>>>>>>> Could not agree more!
>>>>>>>>> 
>>>>>>>>> But then I think the easiest is still: print exception and die.
>>>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>>>> done
>>>>>>>>> 
>>>>>>>>> All the other ways to recover a pipeline that was processing
>> partially
>>>>>>>>> all the time
>>>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>>>>>>> straight forward IMO.
>>>>>>>>> 
>>>>>>>>> How to find the offset, when it became to bad when it is not the
>>>>>>>>> latest
>>>>>>>>> commited one?
>>>>>>>>> How to reset there? with some reasonable stuff in your rockses?
>>>>>>>>> 
>>>>>>>>> If one would do the following. The continuing Handler would measure
>>>>>>>>> for
>>>>>>>>> a threshold and
>>>>>>>>> would terminate after a certain threshold has passed (per task).
>> Then
>>>>>>>>> one can use offset commit/ flush intervals
>>>>>>>>> to make reasonable assumption of how much is slipping by + you get
>> an
>>>>>>>>> easy recovery when it gets to bad
>>>>>>>>> + you could also account for "in processing" records.
>>>>>>>>> 
>>>>>>>>> Setting this threshold to zero would cover all cases with 1
>>>>>>>>> implementation. It is still beneficial to have it pluggable
>>>>>>>>> 
>>>>>>>>> Again CRC-Errors are the only bad pills we saw in production for
>> now.
>>>>>>>>> 
>>>>>>>>> Best Jan
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>>>>>> 
>>>>>>>>>> Jan, I agree with you philosophically. I think one practical
>>>>>>>>>> challenge
>>>>>>>>>> has
>>>>>>>>>> to do with data formats. Many people use untyped events, so there
>> is
>>>>>>>>>> simply
>>>>>>>>>> no guarantee on the form of the input. E.g. many companies use
>> JSON
>>>>>>>>>> without
>>>>>>>>>> any kind of schema so it becomes very hard to assert anything
>> about
>>>>>>>>>> the
>>>>>>>>>> input which makes these programs very fragile to the "one
>> accidental
>>>>>>>>>> message publication that creates an unsolvable problem.
>>>>>>>>>> 
>>>>>>>>>> For that reason I do wonder if limiting to just serialization
>>>>>>>>>> actually
>>>>>>>>>> gets
>>>>>>>>>> you a useful solution. For JSON it will help with the problem of
>>>>>>>>>> non-parseable JSON, but sounds like it won't help in the case
>> where
>>>>>>>>>> the
>>>>>>>>>> JSON is well-formed but does not have any of the fields you expect
>>>>>>>>>> and
>>>>>>>>>> depend on for your processing. I expect the reason for limiting
>> the
>>>>>>>>>> scope
>>>>>>>>>> is it is pretty hard to reason about correctness for anything that
>>>>>>>>>> stops in
>>>>>>>>>> the middle of processing an operator DAG?
>>>>>>>>>> 
>>>>>>>>>> -Jay
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <
>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> IMHO your doing it wrong then. + building to much support into the
>>>>>>>>>>> kafka
>>>>>>>>>>> eco system is very counterproductive in fostering a happy
>> userbase
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
>>>>>>>>>>>> giving
>>>>>>>>>>>> people options and there are times when you don't want to fail
>>>>>>>>>>>> fast.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <
>> Jan.Filipiak@trivago.com
>>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi
>>>>>>>>>>>> 
>>>>>>>>>>>>> 1.
>>>>>>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that
>>>>>>>>>>>>> when
>>>>>>>>>>>>> you
>>>>>>>>>>>>> monitor only the lag of all your apps
>>>>>>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>>>>>>> Monitoring
>>>>>>>>>>>>> is very much more complicated as
>>>>>>>>>>>>> you know need to monitor fail % of some special apps aswell.
>> In my
>>>>>>>>>>>>> opinion that is a huge downside already.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2.
>>>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be
>> the
>>>>>>>>>>>>> record
>>>>>>>>>>>>> that is broken, it might be just your app
>>>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
>>>>>>>>>>>>> partitioned
>>>>>>>>>>>>> away from that registry.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 3. When you get alerted because of to high fail percentage.
>> what
>>>>>>>>>>>>> are the
>>>>>>>>>>>>> steps you gonna do?
>>>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much
>> time
>>>>>>>>>>>>> to
>>>>>>>>>>>>> find a good reprocess offset.
>>>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
>>>>>>>>>>>>> lost.
>>>>>>>>>>>>> This routine is nonsense.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Dead letter queues would be the worst possible addition to the
>>>>>>>>>>>>> kafka
>>>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the
>> architecture
>>>>>>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far
>> is
>>>>>>>>>>>>> crc
>>>>>>>>>>>>> errors. any plans for those?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There
>> are
>>>>>>>>>>>>>> plenty
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> of
>>>>>>>>>>>>> 
>>>>>>>>>>>>> times when you don't want to fail-fast and must attempt to
>> make
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> progress.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>>>>>>>> course if
>>>>>>>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <
>>>>>>>>>>>>>> matthias@confluent.io>
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> First a meta comment. KIP discussion should take place on the
>> dev
>>>>>>>>>>>>>> list
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both
>>>>>>>>>>>>>>> lists.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of
>>>>>>>>>>>>>> sense to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to
>>>>>>>>>>>>>>> fail a
>>>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from
>> the
>>>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I
>> want
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> bring
>>>>>>>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>>>>>>>> there (if
>>>>>>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>>>>>>>> think that
>>>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I
>>>>>>>>>>>>>>> have in
>>>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer
>>>>>>>>>>>>>>> miss
>>>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not
>> even
>>>>>>>>>>>>>>> be able
>>>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>>>>>>>> reason to
>>>>>>>>>>>>>>> stop processing but you just skip over those records. Of
>>>>>>>>>>>>>>> course, you
>>>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert
>> (either
>>>>>>>>>>>>>>> via logs
>>>>>>>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>>>>>>>> investigate
>>>>>>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>>>>>>>> understanding, the
>>>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I
>> don't
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> those record would be fed back at any point in time (so I
>> don't
>>>>>>>>>>>>>>> see any
>>>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just
>>>>>>>>>>>>>>> "fully
>>>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually
>> encode
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to
>>>>>>>>>>>>>>> enable
>>>>>>>>>>>>>>> such
>>>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just
>> log
>>>>>>>>>>>>>>> the bad
>>>>>>>>>>>>>>> records, but it would be harder to access (you first must
>> find
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> Streams instance that did write the log and extract the
>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>>>>>>>> topic with
>>>>>>>>>>>>>>> some bad records and some good records. If we always
>> fail-fast,
>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>> going to be super hard to process the good data. You would
>> need
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> write
>>>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering
>>>>>>>>>>>>>>> out the
>>>>>>>>>>>>>>> bad
>>>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> that failing fast is most likely the best option in
>> production
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> necessarily, true.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Or do you think there are scenarios, for which you can
>> recover
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> corrupted records successfully? And even if this is
>> possible, it
>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole
>>>>>>>>>>>>>>> application?
>>>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record,
>> should
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve
>> the
>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an
>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the
>> Processor
>>>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> user would write a simple application that would then be
>>>>>>>>>>>>>>>> augmented
>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
>>>>>>>>>>>>>>>> become much
>>>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>>>>>>>> paths for
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> - Do we have sufficient information available to make
>> informed
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> decisions
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> what to do next?  For example, do we know in which part of
>> the
>>>>>>>>>>>>>>>> topology
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this
>> is
>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>>>>>>>> corrupted
>>>>>>>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what
>>>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g.
>> how to
>>>>>>>>>>>>>> allow
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter
>>>>>>>>>>>>>>>> queue
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> again, then try the second record for the time being and go
>> back
>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> first record at a later time").  Jay and Jan already alluded
>> to
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>>>>>>>> retries
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in
>> the
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> conversation
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>>>>>>>> category of
>>>>>>>>>>>>>>>> poison pills / deserialization errors.  But since Jay
>> brought
>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>> --
>>>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the
>>>>>>>>>>>>>>>> code for
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
>>>>>>>>>>>>>>>> path
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> (using
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>>>>>>>      computation: scala.util.Try[Int] =
>>>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>      scala> computation.getOrElse(42)
>>>>>>>>>>>>>>>>      res2: Int = 42
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is
>>>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>      computation match {
>>>>>>>>>>>>>>>>        case scala.util.Success(x) => x * 5
>>>>>>>>>>>>>>>>        case scala.util.Failure(_) => 42
>>>>>>>>>>>>>>>>      }
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in
>>>>>>>>>>>>>>>> Scala,
>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> that's
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have
>> an
>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> code "the happy path", and then have a different code path
>> for
>>>>>>>>>>>>>>>> failures
>>>>>>>>>>>>>>>> (using exceptions and handlers);  or should we treat both
>>>>>>>>>>>>>>>> Success and
>>>>>>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I think the failure/exception handling approach (as
>> proposed in
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> aka
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined
>>>>>>>>>>>>>> through
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> configuration (explicit serdes however are defined through
>> API
>>>>>>>>>>>>>>>> calls).
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>>>>>>>>> handling
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail
>> to
>>>>>>>>>>>>>>>> guard
>>>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>      scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>>      stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>      // Here: Fallback to a sane default when encountering
>>>>>>>>>>>>>>>> failed
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>      scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>>>>>>      res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>      // Here: Skip over failed records
>>>>>>>>>>>>>>>>      scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>>>>>>>> Success(s)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> => s
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The above is more natural to me than using error handlers to
>>>>>>>>>>>>>>>> define
>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an
>>>>>>>>>>>>>>>> arithmetic
>>>>>>>>>>>>>>>> exception).  Again, it might help the KIP if we added an
>>>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> example
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> for such user code errors.
>>>>>>>>>>>>>>>> ----------------------------snip--------------------------
>> --
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> ConsumerRecord
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> deserialisation.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am working with Database Changelogs only. I would really
>> not
>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order.
>> Just
>>>>>>>>>>>>>>>>> grind
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
>>>>>>>>>>>>>>>>> letters.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> (where
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>      - I think we should hold off on retries unless we
>> have
>>>>>>>>>>>>>>>>> worked
>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      full usage pattern, people can always implement their
>>>>>>>>>>>>>>>> own. I
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> the idea
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      is that you send the message to some kind of dead
>>>>>>>>>>>>>>>>>> letter queue
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      replay these later. This obviously destroys all
>> semantic
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> guarantees
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> we are
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>      working hard to provide right now, which may be okay.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message