kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stanislav Kozlovski <stanis...@confluent.io>
Subject Re: [DISCUSS] KIP-334 Include partitions in exceptions raised during consumer record deserialization/validation
Date Thu, 02 Aug 2018 09:39:57 GMT
Hi group,

I've updated the KIP and PR with the discussed interface changes.
I am also starting a voting thread

Best,
Stanislav

On Thu, Aug 2, 2018 at 1:27 AM Jason Gustafson <jason@confluent.io> wrote:

> Hey Stanislav,
>
> Just to make sure I understood you right - you propose not exposing any new
> > exception types but rather the interface itself only?
>
>
> Yes, exactly. Our exception hierarchy is a bit of a mess to be honest.
> Interfaces are more flexible and here it simplifies the error handling.
>
> Regardless, I believe this is best left out for another KIP as I feel it
> > would warrant a bigger discussion
>
>
> Ok, that's fair. I thought I'd suggest it here just to see if there was any
> interest in the community. At least with this KIP, users have a viable way
> to skip past bad data if they wish.
>
> -Jason
>
> On Tue, Jul 31, 2018 at 2:42 AM, Stanislav Kozlovski <
> stanislav@confluent.io
> > wrote:
>
> > Hey Jason,
> >
> > Just to make sure I understood you right - you propose not exposing any
> new
> > exception types but rather the interface itself only? So a sample code
> > dealing with this would be something like:
> >
> > try {
> >     // ...
> > } catch (KafkaException e) {
> >     if (e instanceof UnconsumableRecordException) {
> >       // handle retry
> >     }
> > }
> >
> > If that is the case, I like it better.
> >
> >
> > In regards to automatic handling of unconsumable messages - I like that
> > idea too. To me, a callback seems like the more straightforward
> approach. A
> > config such as `seek.past.unconsumable.record` limits the behavior too
> > much
> > in my opinion, I believe giving them the option to implement a (or use
> the
> > default) callback is better in that way.
> > Regardless, I believe this is best left out for another KIP as I feel it
> > would warrant a bigger discussion
> >
> > Best,
> > Stanislav
> >
> > On Mon, Jul 30, 2018 at 9:34 PM Jason Gustafson <jason@confluent.io>
> > wrote:
> >
> > > Hey Stanislav,
> > >
> > > Thanks for the KIP. I think the goal is to allow users to seek past a
> > > records which cannot be parsed for whatever reason. However, it's a
> > little
> > > annoying that you need to catch two separate types to handle this. I'm
> > > wondering if it makes sense to expose an interface like
> > > `UnconsumableRecordException` or something like that. The consumer
> could
> > > then have separate internal exception types which extend from
> > > InvalidRecordException and SerializationException respectively and
> > > implement `UnconsumableRecordException`. That would simplify the
> handling
> > > and users could check the cause if they cared which case it was.
> > >
> > > Another question for consideration. I'd imagine some users would find
> it
> > > helpful to seek past failed messages automatically. If there is a
> corrupt
> > > record, for example, there's almost nothing you can do except seek past
> > it
> > > anyway. I'm wondering if there should be a config for this or if users
> > > should be able to install a callback of some sorts to handle failed
> > > records. Not sure if this is that big of a problem for users, but
> > > interested to hear others thoughts.
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Fri, Jul 20, 2018 at 6:32 PM, Stanislav Kozlovski <
> > > stanislav@confluent.io
> > > > wrote:
> > >
> > > > Hi Ted,
> > > >
> > > > I do plan to start one. When is the appropriate time? My reasoning
> was
> > > that
> > > > people would like to view the changes first
> > > >
> > > > On Fri, Jul 20, 2018, 6:21 PM Ted Yu <yuzhihong@gmail.com> wrote:
> > > >
> > > > > Hi, Stanislav:
> > > > > Do you plan to start VOTE thread ?
> > > > >
> > > > > Cheers
> > > > >
> > > > > On Fri, Jul 20, 2018 at 6:11 PM Stanislav Kozlovski <
> > > > > stanislav@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hey group,
> > > > > >
> > > > > > I added a Pull Request for this KIP - here it is
> > > > > > https://github.com/apache/kafka/pull/5410
> > > > > > Please take a look.
> > > > > >
> > > > > > Best,
> > > > > > Stanislav
> > > > > >
> > > > > > On Thu, Jul 5, 2018 at 11:06 AM Ismael Juma <ismaelj@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Yes, the Scala consumers have been removed in 2.0.0, which
> > > simplifies
> > > > > > some
> > > > > > > of this. The following commit was an initial step in unifying
> the
> > > > > > exception
> > > > > > > handling:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > https://github.com/apache/kafka/commit/
> > 96bcfdfc7c9aac075635b2034e65e4
> > > > 12a725672e
> > > > > > >
> > > > > > > But more can be done as you mentioned.
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On 5 Jul 2018 9:36 am, "Stanislav Kozlovski" <
> > > stanislav@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > Hey Ismael,
> > > > > > >
> > > > > > > It is only slightly related - my PR would attach two new
> > attributes
> > > > and
> > > > > > > also touch upon deserialization exceptions.
> > > > > > >
> > > > > > > But this PR did provide me with some insight:
> > > > > > > Maybe the best approach would be to make
> > `InvalidRecordException` a
> > > > > > public
> > > > > > > exception instead of introducing a new one - I did not
realize
> it
> > > was
> > > > > not
> > > > > > > publicly exposed.
> > > > > > > Does the following:
> > > > > > >
> > > > > > >  InvalidMessageException extends CorruptRecordException
for
> > > temporary
> > > > > > > compatibility with the old Scala clients.
> > > > > > >  * We want to update the server side code to use and catch
the
> > new
> > > > > > > CorruptRecordException.
> > > > > > >  * Because ByteBufferMessageSet.scala and Message.scala
are
> used
> > in
> > > > > > > both server and client code having
> > > > > > >  * InvalidMessageException extend CorruptRecordException
allows
> > us
> > > to
> > > > > > > change server code without affecting the client.
> > > > > > >
> > > > > > > still apply? I can see that the `ByteBufferMessageSet`
and
> > > `Message`
> > > > > > scala
> > > > > > > classes are not present in the codebase anymore. AFAIA
the old
> > > scala
> > > > > > > clients were removed with 2.0.0 and we can thus update
the
> server
> > > > side
> > > > > > code
> > > > > > > to use the `CorruptRecordException` while changing and
exposing
> > > > > > > `InvalidRecordException` to the public. WDYT?
> > > > > > >
> > > > > > > I will also make sure to not expose the cause of the exception
> > when
> > > > not
> > > > > > > needed, maybe I'll outright remove the `cause` attribute
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 5, 2018 at 4:55 PM Ismael Juma <ismael@juma.me.uk>
> > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP, Stanislav. The following PR looks
> related:
> > > > > > > >
> > > > > > > > https://github.com/apache/kafka/pull/4093/files
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Thu, Jul 5, 2018 at 8:44 AM Stanislav Kozlovski
<
> > > > > > > stanislav@confluent.io
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey everybody,
> > > > > > > > >
> > > > > > > > > I just created a new KIP about exposing more
information in
> > > > > > exceptions
> > > > > > > > > caused by consumer record deserialization/validation.
> Please
> > > > have a
> > > > > > > look
> > > > > > > > at
> > > > > > > > > it, it is a very short page.
> > > > > > > > >
> > > > > > > > > I am working under the assumption that all invalid
record
> or
> > > > > > > > > deserialization exceptions in the consumer pass
through the
> > > > > `Fetcher`
> > > > > > > > > class. Please confirm if that is true, otherwise
I might
> miss
> > > > some
> > > > > > > places
> > > > > > > > > where the exceptions are raised in my implementation
> > > > > > > > >
> > > > > > > > > One concern I have is the name of the second
exception -
> > > > > > > > > `InoperativeRecordException`. I would have named
it
> > > > > > > > > `InvalidRecordException` but that is taken. The
`Fetcher`
> > class
> > > > > > catches
> > > > > > > > > `InvalidRecordException` (here
> > > > > > > > > <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > > > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > > > consumer/internals/Fetcher.java#L1081
> > > > > > > > > >
> > > > > > > > > and here
> > > > > > > > > <
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > > > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > > > consumer/internals/Fetcher.java#L1092
> > > > > > > > > >)
> > > > > > > > > and re-raises it as `KafkaException`, which exposes
it as a
> > > > > > > non-retriable
> > > > > > > > > exception to the user (`InvalidRecordException`
extends
> > > > > > > > > `RetriableExecption`, but `KafkaException` doesn't).
> > > > > > > > > A suggestion I got for an alternative name was
> > > > > > > > > `InvalidFetchRecordException`. Please chime in
if you have
> > > ideas
> > > > > > > > >
> > > > > > > > > Confluence page: KIP-334
> > > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > action?pageId=87297793
> > > > > > > > > >
> > > > > > > > > JIRA Issue: KAFKA-5682 <
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-5682
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Best,
> > > > > > > > > Stanislav
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best,
> > > > > > > Stanislav
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Stanislav
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best,
> > Stanislav
> >
>


-- 
Best,
Stanislav

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message