kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Gustafson <ja...@confluent.io>
Subject Re: [DISCUSS] KIP-334 Include partitions in exceptions raised during consumer record deserialization/validation
Date Thu, 02 Aug 2018 00:27:34 GMT
Hey Stanislav,

Just to make sure I understood you right - you propose not exposing any new
> exception types but rather the interface itself only?


Yes, exactly. Our exception hierarchy is a bit of a mess to be honest.
Interfaces are more flexible and here it simplifies the error handling.

Regardless, I believe this is best left out for another KIP as I feel it
> would warrant a bigger discussion


Ok, that's fair. I thought I'd suggest it here just to see if there was any
interest in the community. At least with this KIP, users have a viable way
to skip past bad data if they wish.

-Jason

On Tue, Jul 31, 2018 at 2:42 AM, Stanislav Kozlovski <stanislav@confluent.io
> wrote:

> Hey Jason,
>
> Just to make sure I understood you right - you propose not exposing any new
> exception types but rather the interface itself only? So a sample code
> dealing with this would be something like:
>
> try {
>     // ...
> } catch (KafkaException e) {
>     if (e instanceof UnconsumableRecordException) {
>       // handle retry
>     }
> }
>
> If that is the case, I like it better.
>
>
> In regards to automatic handling of unconsumable messages - I like that
> idea too. To me, a callback seems like the more straightforward approach. A
> config such as `seek.past.unconsumable.record` limits the behavior too
> much
> in my opinion, I believe giving them the option to implement a (or use the
> default) callback is better in that way.
> Regardless, I believe this is best left out for another KIP as I feel it
> would warrant a bigger discussion
>
> Best,
> Stanislav
>
> On Mon, Jul 30, 2018 at 9:34 PM Jason Gustafson <jason@confluent.io>
> wrote:
>
> > Hey Stanislav,
> >
> > Thanks for the KIP. I think the goal is to allow users to seek past a
> > records which cannot be parsed for whatever reason. However, it's a
> little
> > annoying that you need to catch two separate types to handle this. I'm
> > wondering if it makes sense to expose an interface like
> > `UnconsumableRecordException` or something like that. The consumer could
> > then have separate internal exception types which extend from
> > InvalidRecordException and SerializationException respectively and
> > implement `UnconsumableRecordException`. That would simplify the handling
> > and users could check the cause if they cared which case it was.
> >
> > Another question for consideration. I'd imagine some users would find it
> > helpful to seek past failed messages automatically. If there is a corrupt
> > record, for example, there's almost nothing you can do except seek past
> it
> > anyway. I'm wondering if there should be a config for this or if users
> > should be able to install a callback of some sorts to handle failed
> > records. Not sure if this is that big of a problem for users, but
> > interested to hear others thoughts.
> >
> > Thanks,
> > Jason
> >
> > On Fri, Jul 20, 2018 at 6:32 PM, Stanislav Kozlovski <
> > stanislav@confluent.io
> > > wrote:
> >
> > > Hi Ted,
> > >
> > > I do plan to start one. When is the appropriate time? My reasoning was
> > that
> > > people would like to view the changes first
> > >
> > > On Fri, Jul 20, 2018, 6:21 PM Ted Yu <yuzhihong@gmail.com> wrote:
> > >
> > > > Hi, Stanislav:
> > > > Do you plan to start VOTE thread ?
> > > >
> > > > Cheers
> > > >
> > > > On Fri, Jul 20, 2018 at 6:11 PM Stanislav Kozlovski <
> > > > stanislav@confluent.io>
> > > > wrote:
> > > >
> > > > > Hey group,
> > > > >
> > > > > I added a Pull Request for this KIP - here it is
> > > > > https://github.com/apache/kafka/pull/5410
> > > > > Please take a look.
> > > > >
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > > > On Thu, Jul 5, 2018 at 11:06 AM Ismael Juma <ismaelj@gmail.com>
> > wrote:
> > > > >
> > > > > > Yes, the Scala consumers have been removed in 2.0.0, which
> > simplifies
> > > > > some
> > > > > > of this. The following commit was an initial step in unifying
the
> > > > > exception
> > > > > > handling:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > https://github.com/apache/kafka/commit/
> 96bcfdfc7c9aac075635b2034e65e4
> > > 12a725672e
> > > > > >
> > > > > > But more can be done as you mentioned.
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On 5 Jul 2018 9:36 am, "Stanislav Kozlovski" <
> > stanislav@confluent.io
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > Hey Ismael,
> > > > > >
> > > > > > It is only slightly related - my PR would attach two new
> attributes
> > > and
> > > > > > also touch upon deserialization exceptions.
> > > > > >
> > > > > > But this PR did provide me with some insight:
> > > > > > Maybe the best approach would be to make
> `InvalidRecordException` a
> > > > > public
> > > > > > exception instead of introducing a new one - I did not realize
it
> > was
> > > > not
> > > > > > publicly exposed.
> > > > > > Does the following:
> > > > > >
> > > > > >  InvalidMessageException extends CorruptRecordException for
> > temporary
> > > > > > compatibility with the old Scala clients.
> > > > > >  * We want to update the server side code to use and catch the
> new
> > > > > > CorruptRecordException.
> > > > > >  * Because ByteBufferMessageSet.scala and Message.scala are
used
> in
> > > > > > both server and client code having
> > > > > >  * InvalidMessageException extend CorruptRecordException allows
> us
> > to
> > > > > > change server code without affecting the client.
> > > > > >
> > > > > > still apply? I can see that the `ByteBufferMessageSet` and
> > `Message`
> > > > > scala
> > > > > > classes are not present in the codebase anymore. AFAIA the old
> > scala
> > > > > > clients were removed with 2.0.0 and we can thus update the server
> > > side
> > > > > code
> > > > > > to use the `CorruptRecordException` while changing and exposing
> > > > > > `InvalidRecordException` to the public. WDYT?
> > > > > >
> > > > > > I will also make sure to not expose the cause of the exception
> when
> > > not
> > > > > > needed, maybe I'll outright remove the `cause` attribute
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 5, 2018 at 4:55 PM Ismael Juma <ismael@juma.me.uk>
> > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP, Stanislav. The following PR looks related:
> > > > > > >
> > > > > > > https://github.com/apache/kafka/pull/4093/files
> > > > > > >
> > > > > > > Ismael
> > > > > > >
> > > > > > > On Thu, Jul 5, 2018 at 8:44 AM Stanislav Kozlovski <
> > > > > > stanislav@confluent.io
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey everybody,
> > > > > > > >
> > > > > > > > I just created a new KIP about exposing more information
in
> > > > > exceptions
> > > > > > > > caused by consumer record deserialization/validation.
Please
> > > have a
> > > > > > look
> > > > > > > at
> > > > > > > > it, it is a very short page.
> > > > > > > >
> > > > > > > > I am working under the assumption that all invalid
record or
> > > > > > > > deserialization exceptions in the consumer pass through
the
> > > > `Fetcher`
> > > > > > > > class. Please confirm if that is true, otherwise I
might miss
> > > some
> > > > > > places
> > > > > > > > where the exceptions are raised in my implementation
> > > > > > > >
> > > > > > > > One concern I have is the name of the second exception
-
> > > > > > > > `InoperativeRecordException`. I would have named it
> > > > > > > > `InvalidRecordException` but that is taken. The `Fetcher`
> class
> > > > > catches
> > > > > > > > `InvalidRecordException` (here
> > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/internals/Fetcher.java#L1081
> > > > > > > > >
> > > > > > > > and here
> > > > > > > > <
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > https://github.com/apache/kafka/blob/c5b00d20d3703b7fc4358b7258d5d6
> > > adb890136f/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/internals/Fetcher.java#L1092
> > > > > > > > >)
> > > > > > > > and re-raises it as `KafkaException`, which exposes
it as a
> > > > > > non-retriable
> > > > > > > > exception to the user (`InvalidRecordException` extends
> > > > > > > > `RetriableExecption`, but `KafkaException` doesn't).
> > > > > > > > A suggestion I got for an alternative name was
> > > > > > > > `InvalidFetchRecordException`. Please chime in if
you have
> > ideas
> > > > > > > >
> > > > > > > > Confluence page: KIP-334
> > > > > > > > <
> > > > > > >
> > > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > action?pageId=87297793
> > > > > > > > >
> > > > > > > > JIRA Issue: KAFKA-5682 <
> > > > > > https://issues.apache.org/jira/browse/KAFKA-5682
> > > > > > > >
> > > > > > > > --
> > > > > > > > Best,
> > > > > > > > Stanislav
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best,
> > > > > > Stanislav
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best,
> > > > > Stanislav
> > > > >
> > > >
> > >
> >
>
>
> --
> Best,
> Stanislav
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message