From user-return-34710-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon May 4 11:23:03 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id C38EF180608 for ; Mon, 4 May 2020 13:23:02 +0200 (CEST) Received: (qmail 10771 invoked by uid 500); 4 May 2020 11:23:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 10757 invoked by uid 99); 4 May 2020 11:23:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 May 2020 11:23:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A88751A3363 for ; Mon, 4 May 2020 11:23:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.75 X-Spam-Level: X-Spam-Status: No, score=0.75 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, KAM_INFOUSMEBIZ=0.75, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-ec2-va.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 9NV4eTLqta2F for ; Mon, 4 May 2020 11:22:58 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=209.85.166.177; helo=mail-il1-f177.google.com; envelope-from=manish.c.ghildiyal@gmail.com; receiver= Received: from mail-il1-f177.google.com (mail-il1-f177.google.com [209.85.166.177]) by mx1-ec2-va.apache.org (ASF Mail Server at mx1-ec2-va.apache.org) with ESMTPS id 7DB7FBB85A for ; Mon, 4 May 2020 11:22:58 +0000 (UTC) Received: by mail-il1-f177.google.com with SMTP id s10so10828004iln.11 for ; Mon, 04 May 2020 04:22:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=spYtynzHWklDqGioI5e1Y2n/PiS5d7PxM8/5et5802w=; b=sIwcqBcri5I+8uGpLE737QVCjC5PFNfVVcMBtLGiajNC0CcTbD4Chv4ugpigOWZxZB +G1CR3vPrrJFZaK9Z628uKPpwdXVCiGmpdhis6RvHS5Jez2AYYQx7B9UjxTZKKsuEuBo gLstYxjLydfM8j1WvQuirqkP4PJo1RzNWyBY4wzugVM4/HCS7wiPp6x6hMxjxEr0Q8Ng XFADxLG9VRpuBHYWuqFxhcHULy818UdWfxzvii7mwxfAFE3uqp5CiId80WKdDqJXpn/Y zx15CL2XHLObaZSUiTAbcaUTUYJOm+l9QKLDaIR6RdKgSj9YUWIqrkeCop4lhs/u7UP1 Vm8Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=spYtynzHWklDqGioI5e1Y2n/PiS5d7PxM8/5et5802w=; b=Nolc1iBqOKCakgtfGsKjjmS3zUNXG5cxYOhKgm6gp6fhompxn34fl3HjBVkpF/CXc+ 68GBiVTBZG+2QEUxtbxE80FFDNzA19QhGOb0ck5By/60KXWejUDlGZIBWX0co2bqGdo/ 5xoxvlFlHYPOdVbrvSa5Zi5PdHa16PvRRj8aLmGU8JrCpflP30KO4FOiCwqU85Ve8m2a J0SGoEGobE1pr9Xkxlb7/hG4kpb4gisRY1NNJsNlCN1FWxQ/cqEeLpzxBPSs6PFuWB+Y REKA/8cpsRlm7uMLQkPHzrIiKCXtGxpXN+3KEA11TdMzBeilnEpzNk4aIQoYcGITj8n7 WPNw== X-Gm-Message-State: AGi0PuZ7UfvfibCgLzQ6FrEi0ij+lMPDXCDtax1upFWpT669NJmi/wmL v7bSEuf2usnci43M4mrvLRptE1kZHfD/wOk+4V0= X-Google-Smtp-Source: APiQypKAj4ihPhztY6IeGBiftqjLerSpiCDV5Ljs3dLpbRKNJrYCrn7DN0Hj9AsU5rglraFIhl3+Kaw3EaUfeZF8i48= X-Received: by 2002:a92:d90b:: with SMTP id s11mr16073284iln.295.1588591377984; Mon, 04 May 2020 04:22:57 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Manish G Date: Mon, 4 May 2020 16:52:45 +0530 Message-ID: Subject: Re: InvalidTypesException: Input mismatch while consuming Kafka messages To: Robert Metzger Cc: user Content-Type: multipart/alternative; boundary="000000000000c7340c05a4d0c0ea" --000000000000c7340c05a4d0c0ea Content-Type: text/plain; charset="UTF-8" Thanks. It worked by introducing a custom DeserializationSchema. On Mon, May 4, 2020 at 3:04 PM Robert Metzger wrote: > Hi, > Can you provide the full stack trace of your exception? > Most likely, the error is caused by this setting: > > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > MyCustomClassDeserializer.class.getName()); > > You need to use Flink's DeserializationSchema. > > On Mon, May 4, 2020 at 10:26 AM Manish G > wrote: > >> I have following code: >> >> ////////////////////// >> Properties properties = new Properties(); >> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> MyCustomClassDeserializer.class.getName()); >> >> FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer( >> "test-kafka=topic", >> new SimpleStringSchema(), >> properties); >> >> final StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream kafkaInputStream = >> streamEnv.addSource(kafkaConsumer); >> >> DataStream stringStream = kafkaInputStream >> .map(new MapFunction() { >> @Override >> public String map(MyCustomClass message) { >> logger.info("--- Received message : " + >> message.toString()); >> return message.toString(); >> } >> }); >> >> streamEnv.execute("Published messages"); >> >> /////// >> MyCustomClassDeserializer is implemented as: >> >> public MyCustomClass deserialize(String s, byte[] bytes) { >> return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, >> MyCustomClass.class); >> } >> >> When I run this program locally, I get error: >> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >> Input mismatch: Basic type expected. >> >> Why I get this error? >> > --000000000000c7340c05a4d0c0ea Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks. It worked by introducing a custom=20 Des= erializationSchema.

On Mon, May 4, 2020 at 3:04 PM Robert Metzger <= ;rmetzger@apache.org> wrote:<= br>
Hi,
Can you provide the full stack trace of your exception?
= Most likely, the error is caused=C2=A0by this setting:

=
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,= MyCustomClassDeserializer.class.getName());

Y= ou need to use Flink's=C2=A0DeserializationSchema.

<= div class=3D"gmail_quote">
On Mon, May= 4, 2020 at 10:26 AM Manish G <manish.c.ghildiyal@gmail.com> wrote:
= I have following code:

//////////////////////
<= /div>
Properties properties =3D new Properties();
properties.setProp= erty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeseriali= zer.class.getName());

FlinkKafkaConsumer<MyCustomClass> kafkaC= onsumer =3D new FlinkKafkaConsumer(
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "test-kafka=3Dtopic",
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 new Simp= leStringSchema(),
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 properties);

final StreamExecutionEnvironment= streamEnv =3D StreamExecutionEnvironment.getExecutionEnvironment();
Dat= aStream<MyCustomClass> kafkaInputStream =3D streamEnv.addSource(kafka= Consumer);

DataStream<String> stringStream =3D kafkaInputStrea= m
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = .map(new MapFunction<MyCustomClass,String>() {
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Overrid= e
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 public String map(MyCustomClass message) {
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 logger.info= ("--- Received message : " + message.toString());
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 return message.toString();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 });

stream= Env.execute("Published messages");

///////
MyCustomClassDeserializer is implemented as:

publi= c MyCustomClass deserialize(String s, byte[] bytes) {
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCu= stomClass.class);
=C2=A0 =C2=A0 }

When I run th= is program locally, I get error:
Caused by: org.apache.flink.api.= common.functions.InvalidTypesException: Input mismatch: Basic type expected= .

Why I get this error?
--000000000000c7340c05a4d0c0ea--