kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS] KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions
Date Thu, 13 Dec 2018 13:13:58 GMT
For store updates, records are first serialized and afterwards put into
the store and written into the changelog topic.

In the current implementation, if the send() into the changelog topic
produces an error and the handler skips over it, the local store content
and the changelog topic diverge. This seems to be a correctness issue.

For serialization error, it would not happen that store and changelog
diverge, because serialization happens before and put/send. Thus, with
this KIP we could skip both put() and send(). However, I am still
wondering, if it would be ok to skip a store update for this case? (Btw:
the current PR does not address this atm, and a serialization error for
a store write would not be covered but kill the instance).

IIRC, the original idea of the KIP was to allow skipping over record for
output topics only. That's why I am wondering if it's ok to allow
skipper over record in repartitions topics, too.

In the end, it's some data loss for all 3 cases, so maybe it's ok to
allow skipping for all 3 cases. However, we should not allow that local
store and changelog topic diverge IMHO (what might been an orthogonal
bug thought).

I also don't have an answer or preference. Just think, it's important to
touch on those cases and get input how people think about it.


-Matthias



On 12/11/18 11:43 AM, Kamal Chandraprakash wrote:
> Matthias,
> 
> For changelog topics, I think it does not make sense to allow skipping
> records if serialization fails? For internal repartitions topics, I am
> not sure if we should allow it or not. Would you agree with this? We
> should discuss the implication to derive a sound design.
> 
> Can you explain the issue that happens when records are skipped to
> changelog / internal-repartition topics ? So, that I can look into it.
> 
> On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>>>> To accept different types of records from multiple topologies, I have to
>>>> define the ProducerRecord without generics.
>>
>> Yes. It does make sense. My point was, that the KIP should
>> mention/explain this explicitly to allow other not familiar with the
>> code base to understand it more easily :)
>>
>>
>>
>> About `ClassCastException`: seems to be an implementation detail. No
>> need to make it part of the KIP discussion.
>>
>>
>>
>> One more thing that came to my mind. We use the `RecordCollector` to
>> write into all topics, ie, user output topics and internal repartition
>> and changelog topics.
>>
>> For changelog topics, I think it does not make sense to allow skipping
>> records if serialization fails? For internal repartitions topics, I am
>> not sure if we should allow it or not. Would you agree with this? We
>> should discuss the implication to derive a sound design.
>>
>> I was also just double checking the code, and it seems that the current
>> `ProductionExceptionHandler` is applied for all topics. This seems to be
>> incorrect to me. Seems we missed this case when doing KIP-210? (Or did
>> we discuss this and I cannot remember? Might be worth to double check.)
>>
>> Last thought: of course, the handler will know which topic is affected
>> and can provide a corresponding implementation. Was just wondering if we
>> should be more strict?
>>
>>
>> -Matthias
>>
>> On 12/6/18 10:01 AM, Kamal Chandraprakash wrote:
>>> Matt,
>>>     I agree with Matthias on not to altering the serializer as it's used
>> by
>>> multiple components.
>>>
>>> Matthias,
>>>
>>>  - the proposed method accepts a `ProducerRecord` -- it might be good to
>>> explain why this cannot be done in a type safe way (ie, missing generics)
>>>
>>> To accept different types of records from multiple topologies, I have to
>>> define the ProducerRecord without generics.
>>>
>>> - `AlwaysProductionExceptionHandler` ->
>>> `AlwaysContinueProductionExceptionHandler`
>>>
>>> Updated the typo error in KIP.
>>>
>>>  - `DefaultProductionExceptionHandler` is not mentioned
>>>
>>> The `handleSerializationException` method in the
>>> `ProductionExceptionHandler` interface will have default implementation
>>> that is set to FAIL by default.
>>> This is done to avoid any changes in the user implementation. So, I
>> didn't
>>> mentioned the `DefaultProductionExceptionHandler` class. Updated the KIP.
>>>
>>> - Why do you distinguish between `ClassCastException` and "any other
>>> unchecked exception? Both second case seems to include the first one?
>>>
>>> In SinkNode.java#93
>>> <
>> https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
>>>
>>> on
>>> hitting `ClassCastException`, we are halting the streams as it's a fatal
>>> error.
>>> To keep the original behavior, I've to distinguish the exceptions.
>>>
>>>
>>> On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>
>>>> Well, that's exactly the point. The serializer should not be altered
>>>> IMHO because this would have impact on other components. Also, for
>>>> applications that use KafkaProducer directly, they can catch any
>>>> serialization exception and react to it. Hence, I don't don't see a
>>>> reason to change the serializer interface.
>>>>
>>>> Instead, it seems better to solve this issue in Streams by allowing to
>>>> skip over a record for this case.
>>>>
>>>> Some more comments on the KIP:
>>>>
>>>>  - the proposed method accepts a `ProducerRecord` -- it might be good to
>>>> explain why this cannot be done in a type safe way (ie, missing
>> generics)
>>>>
>>>>  - `AlwaysProductionExceptionHandler` ->
>>>> `AlwaysContinueProductionExceptionHandler`
>>>>
>>>>  - `DefaultProductionExceptionHandler` is not mentioned
>>>>
>>>>  - Why do you distinguish between `ClassCastException` and "any other
>>>> unchecked exception? Both second case seems to include the first one?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/6/18 8:35 AM, Matt Farmer wrote:
>>>>> Ah, good point.
>>>>>
>>>>> Should we consider altering the serializer interface to permit not
>>>> sending
>>>>> the record?
>>>>>
>>>>> On Wed, Dec 5, 2018 at 9:23 PM Kamal Chandraprakash <
>>>>> kamal.chandraprakash@gmail.com> wrote:
>>>>>
>>>>>> Matt,
>>>>>>
>>>>>>     That's a good point. If these cases are handled in the serializer,
>>>> then
>>>>>> one cannot continue the stream processing by skipping the record.
>>>>>> To continue, you may have to send a empty record serialized key/value
>>>> (new
>>>>>> byte[0]) to the downstream on hitting the error which may cause
>>>> un-intended
>>>>>> results.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 5, 2018 at 8:41 PM Matt Farmer <matt@frmr.me> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> Thanks for this KIP.
>>>>>>>
>>>>>>> What’s the thinking behind doing this in ProductionExceptionHandler
>>>>>> versus
>>>>>>> handling these cases in your serializer implementation?
>>>>>>>
>>>>>>> On Mon, Dec 3, 2018 at 1:09 AM Kamal Chandraprakash <
>>>>>>> kamal.chandraprakash@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello dev,
>>>>>>>>
>>>>>>>>   I hope to initiate the discussion for KIP-399: Extend
>>>>>>>> ProductionExceptionHandler to cover serialization exceptions.
>>>>>>>>
>>>>>>>> KIP: <
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions
>>>>>>>>>
>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-7499
>>>>>>>>
>>>>>>>> All feedbacks will be highly appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Kamal Chandraprakash
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message