Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 438EC200C14 for ; Tue, 7 Feb 2017 08:18:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 42474160B4B; Tue, 7 Feb 2017 07:18:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 66CA1160B3E for ; Tue, 7 Feb 2017 08:18:48 +0100 (CET) Received: (qmail 94504 invoked by uid 500); 7 Feb 2017 07:18:47 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 94494 invoked by uid 99); 7 Feb 2017 07:18:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Feb 2017 07:18:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 30932181B6F for ; Tue, 7 Feb 2017 07:18:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -1.199 X-Spam-Level: X-Spam-Status: No, score=-1.199 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id G0VL9fDXPOj2 for ; Tue, 7 Feb 2017 07:18:45 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 2B0385F30A for ; Tue, 7 Feb 2017 07:18:45 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 701BFE0591 for ; Tue, 7 Feb 2017 07:18:42 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id CF7DA2528E for ; Tue, 7 Feb 2017 07:18:41 +0000 (UTC) Date: Tue, 7 Feb 2017 07:18:41 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (FLINK-5583) Support flexible error handling in the Kafka consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Tue, 07 Feb 2017 07:18:49 -0000 [ https://issues.apache.org/jira/browse/FLINK-5583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15855466#comment-15855466 ] Tzu-Li (Gordon) Tai edited comment on FLINK-5583 at 2/7/17 7:18 AM: -------------------------------------------------------------------- Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? was (Author: tzulitai): Ah, I see what you mean. For your use case it makes sense, but I don't think this necessary for general use cases (especially the {{writeToExternalSources}} method). First of all, I would still like to keep the interface to the minimal flatMap-like version proposed in FLINK-3679: {code} public interface NewDeserializationSchema extends Serializable, ResultTypeQueryable { // user uses collector to buffer outputs void deserialize(byte[] message, OutputCollector collector); } {code} Something like the above (ignore the dummy name, we can think of a better one :-D). The way it would work is: 1. Consumer starts processing record with offset 32 (example). 2. Have an internal buffer in the consumer to collect the zero or more records produced by calling {{deserialization.deserialize(recordBytes, collector)}}. 3. All the records in the buffer must be flushed, and offset 32 updated into consumer state, as a single atomic operation synchronized on the checkpoint lock. 4. The checkpointing is synchronized on the lock, so we can make sure that a checkpoint barrier will only come either after or before all the produced records of offset 32. For the synchronization explained above, we do not need to expose another {{flush}} method to the user. For your use case, in which you want to write corrupt bytes to a storage, you would do that with a try-catch block in your implementation of {{deserialization.deserialize(bytes, collector)}}. The only limitation here is that it must be a blocking call. Blocking call for this might be ok, depending on the frequency of corrupt messages. What do you think [~wheat9]? > Support flexible error handling in the Kafka consumer > ----------------------------------------------------- > > Key: FLINK-5583 > URL: https://issues.apache.org/jira/browse/FLINK-5583 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Haohui Mai > Assignee: Haohui Mai > > We found that it is valuable to allow the applications to handle errors and exceptions in the Kafka consumer in order to build a robust application in production. > The context is the following: > (1) We have schematized, Avro records flowing through Kafka. > (2) The decoder implements the DeserializationSchema to decode the records. > (3) Occasionally there are corrupted records (e.g., schema issues). The streaming pipeline might want to bail out (which is the current behavior) or to skip the corrupted records depending on the applications. > Two options are available: > (1) Have a variant of DeserializationSchema to return a FlatMap like structure as suggested in FLINK-3679. > (2) Allow the applications to catch and handle the exception by exposing some APIs that are similar to the {{ExceptionProxy}}. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.15#6346)