flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer
Date Tue, 07 Feb 2017 07:18:41 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855466#comment-15855466
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:18 AM:
--------------------------------------------------------------------

Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary
for general use cases (especially the {{writeToExternalSources}} method).

First of all, I would still like to keep the interface to the minimal flatMap-like version
proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
{
    // user uses collector to buffer outputs
    void deserialize(byte[] message, OutputCollector<T> collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one :-D).

The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records produced by
calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into consumer state,
as a single atomic operation synchronized on the checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier
will only come either after or before all the produced records of offset 32.

For the synchronization explained above, we do not need to expose another {{flush}} method
to the user.

For your use case, in which you want to write corrupt bytes to a storage, you would do that
with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}.
The only limitation here is that it must be a blocking call. Blocking call for this might
be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]?


was (Author: tzulitai):
Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary
for general use cases (especially the {{writeToExternalSources}} method).

First of all, I would still like to keep the interface to the minimal flatMap-like version
proposed in FLINK-3679:

{code}
public interface NewDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T>
{
    // user uses collector to buffer outputs
    void deserialize(byte[] message, OutputCollector<T> collector);
}
{code}

Something like the above (ignore the dummy name, we can think of a better one :-D).
The way it would work is:
1. Consumer starts processing record with offset 32 (example).
2. Have an internal buffer in the consumer to collect the zero or more records produced by
calling {{deserialization.deserialize(recordBytes, collector)}}.
3. All the records in the buffer must be flushed, and offset 32 updated into consumer state,
as a single atomic operation synchronized on the checkpoint lock.
4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier
will only come either after or before all the produced records of offset 32.

For the synchronization explained above, we do not need to expose another {{flush}} method
to the user.

For your use case, in which you want to write corrupt bytes to a storage, you would do that
with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}.
The only limitation here is that it must be a blocking call. Blocking call for this might
be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]?

> Support flexible error handling in the Kafka consumer
> -----------------------------------------------------
>
>                 Key: FLINK-5583
>                 URL: https://issues.apache.org/jira/browse/FLINK-5583
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> We found that it is valuable to allow the applications to handle errors and exceptions
in the Kafka consumer in order to build a robust application in production.
> The context is the following:
> (1) We have schematized, Avro records flowing through Kafka.
> (2) The decoder implements the DeserializationSchema to decode the records.
> (3) Occasionally there are corrupted records (e.g., schema issues). The streaming pipeline
might want to bail out (which is the current behavior) or to skip the corrupted records depending
on the applications.
> Two options are available:
> (1) Have a variant of DeserializationSchema to return a FlatMap like structure as suggested
in FLINK-3679.
> (2) Allow the applications to catch and handle the exception by exposing some APIs that
are similar to the {{ExceptionProxy}}.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message