Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE2C1200B7D for ; Sat, 27 Aug 2016 02:37:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BC99E160AC3; Sat, 27 Aug 2016 00:37:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB543160AB6 for ; Sat, 27 Aug 2016 02:37:48 +0200 (CEST) Received: (qmail 76238 invoked by uid 500); 27 Aug 2016 00:37:47 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 76228 invoked by uid 99); 27 Aug 2016 00:37:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Aug 2016 00:37:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 84A6E18063D for ; Sat, 27 Aug 2016 00:37:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=mz.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id UhnEeQIluDM5 for ; Sat, 27 Aug 2016 00:37:46 +0000 (UTC) Received: from mail-it0-f51.google.com (mail-it0-f51.google.com [209.85.214.51]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id B91E85FE3D for ; Sat, 27 Aug 2016 00:37:45 +0000 (UTC) Received: by mail-it0-f51.google.com with SMTP id g62so18575065ith.1 for ; Fri, 26 Aug 2016 17:37:45 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=mz.com; s=google; h=mime-version:from:date:message-id:subject:to; bh=vRzEwI7GDGYCbbf+FtfSsFgfZcDeAy62d12x447LGps=; b=f5B3jn6ZZkEB38PCLGEjM4KKUBjWPMBj7CXvlBmHNacRhx7zGz3L1rqB6qRnAj6Y/3 UDj0hmSozzNMOWH26wAHZdvpjxPSCFaPCGYcQYoZWWpr1rL61321sZhVv07lTfu9X+0T /IN+MU33cNmyBpH5zvi+Gv2lGvd1WWzPcEJgY= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=vRzEwI7GDGYCbbf+FtfSsFgfZcDeAy62d12x447LGps=; b=MEWjz465D8t/Rh6Md//BztoYW0UZqTAGalfcFjDWwiSIANTD3Yz7AE6J1GY/XOl25l bdqrc+48eADEAFjL5sDI3YTiM54PmkbQTcu3A7HbPaM4XcHXcO+muHfc6CEPRa0/iQUS pl344LJlnjuuSsL+QVjskU0RU2UNqr5v1jUX/hTzxIAuSFORFynlLtXhZ3bDGbcqdADw p7331OnQxrtOipnEW+YQSys92d2tNhxYhU6mLUvx9zTXb1H+D0YZIeeiwX7oZK3q/00g VSluEn/H5dRzxIz3+1CCTwlPZv7AhTSOUH44rEFwRjWo6GHjwgoFnA2TTFxhdHieMVg1 rPkg== X-Gm-Message-State: AE9vXwPOrxt4qyPMLte+PMYwxA4h0KXlP9Z8hm+HJB1bbCcdGkq0T5qiqAWo6WC1YdH1t4SNzEoPuPWTHgXvIU3C X-Received: by 10.36.249.5 with SMTP id l5mr1966232ith.46.1472258264597; Fri, 26 Aug 2016 17:37:44 -0700 (PDT) MIME-Version: 1.0 Received: by 10.107.134.142 with HTTP; Fri, 26 Aug 2016 17:37:04 -0700 (PDT) From: Jack Huang Date: Fri, 26 Aug 2016 17:37:04 -0700 Message-ID: Subject: Handle deserialization error To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c036276e11b1a053b02d6a8 archived-at: Sat, 27 Aug 2016 00:37:49 -0000 --94eb2c036276e11b1a053b02d6a8 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi all, I have a custom deserializer which I pass to a Kafka source to transform JSON string to Scala case class. val events =3D env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event], new Event), kafkaProp)) =E2=80=8B There are time when the JSON message is malformed, in which case I want to catch the exception, log some error message, and go on to the next message without producing an event to the downstream. It doesn't seem like the DeserializationSchema interface allows such behavior. How could I achieve this? Thanks, Jack --94eb2c036276e11b1a053b02d6a8 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi all,

I have a custom deserializer wh= ich I pass to a Kafka source to transform JSON string to Scala case class.<= /div>
val events =3D env.addSource(new Fli=
nkKafkaConsumer09[Event]("events&qu=
ot;, new=
 JsonSerde(cla=
ssOf[Event], <=
span style=3D"color:rgb(51,51,51);font-weight:bold">new Event), kafkaProp))
=E2=80=8B

<= div>There are time when the JSON message is malformed, in which case I want= to catch the exception, log some error message, and go on to the next mess= age without producing an event to the downstream. It doesn't seem like = the=C2=A0De= serializationSchema interface allows such behavior. How could I achi= eve this?

Thanks,
Jack
--94eb2c036276e11b1a053b02d6a8--