From user-return-26859-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Apr 3 01:06:51 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 349E0180668 for ; Wed, 3 Apr 2019 03:06:50 +0200 (CEST) Received: (qmail 23594 invoked by uid 500); 3 Apr 2019 01:06:48 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 23584 invoked by uid 99); 3 Apr 2019 01:06:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Apr 2019 01:06:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 420E7183AF0 for ; Wed, 3 Apr 2019 01:06:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.802 X-Spam-Level: * X-Spam-Status: No, score=1.802 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Ia2GL6uQDaKW for ; Wed, 3 Apr 2019 01:06:44 +0000 (UTC) Received: from mail-ot1-f68.google.com (mail-ot1-f68.google.com [209.85.210.68]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 4661261154 for ; Wed, 3 Apr 2019 01:06:44 +0000 (UTC) Received: by mail-ot1-f68.google.com with SMTP id s24so13814226otk.13 for ; Tue, 02 Apr 2019 18:06:44 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=m8ETkeoPaPgsmh4XjPxIICSpTvRTVUZfD4E9L8aXPOA=; b=MDCzUEsMctbX+ODqjxitFsjmggGQLD5yL3omb0/bCaLRe2vZBxDRRsQQtgDOm5hiWD zdJU9I3S1t7v4nFUocUABUCWPOolbDk52AkELz/Q3j9rSZhatX0zt65+ZdWBzeQ8GUW+ usPqdbVxVE4LQrJPJw1pLYA7O1p6bZg9xCcvWiTtxVG23jFBpLyesCnYKLNjvUs4Vpl5 KaKsp68d8xT/wJkgUVVXa6A2HCS0s049ow2srpCO4yGfB2FksyTy7D2J5yHAeQYyyUKp Tjm5ZXCbPIHaaQt0FV7Re7PEtcyRxJIMV+ufN1vx/qU2Mb/9Qi5pA64k2o7O00qkTykK +ZPQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=m8ETkeoPaPgsmh4XjPxIICSpTvRTVUZfD4E9L8aXPOA=; b=a3mElvITambgvTfomxBwFEXvAYLsUhxSj4kFQjQ3cgBeXrNqdSBndrDlM2O0v2MQv5 a9Fj+wKHZbmlvrD707TQPrwT8HOa/6tcTm/a3NYe6pAfjcIMaevyGM4oJTuGbFHIu+ZY UOMpLSfftpozTRpwX78gCs5kWN6w4Xk04hfhLmX2dDnGUXJ9EDEstCY5w4jmcHaHM302 1Cf7re6gz5VG6He/EzZ2gn7vCdtwWWenn5+rpLBg1uOZipB/b0v/0fSSEvTzCz7nMJiK KpKKrQnL1dK3Dup175D2BSUA9CD/EZjzW956m2PpTnXiPkjEedZ+Gz5dcx0d2aHNsBzq iwLQ== X-Gm-Message-State: APjAAAV/WuVnz/qal6l4xHT1dD2UaRZ1dawknL9UiE05/mXAhxUVG+ft 73pg58A13nZnIPjxAzS+V33RFPT2bWRvZW+gio3Pxw== X-Google-Smtp-Source: APXvYqzmjtCi0wdSpuKVdQQRMwhHs0I3oSFixhTeEKDiZTRd5sHLnj9QnMgXkqEfTAfFGWzyIuzekUu7febbXLA4rJk= X-Received: by 2002:a9d:6187:: with SMTP id g7mr548559otk.2.1554253603569; Tue, 02 Apr 2019 18:06:43 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Timothy Victor Date: Tue, 2 Apr 2019 20:06:32 -0500 Message-ID: Subject: Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple To: Vijay Balakrishnan Cc: user Content-Type: multipart/alternative; boundary="000000000000ee84bb058595dd83" --000000000000ee84bb058595dd83 Content-Type: text/plain; charset="UTF-8" Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure. The only way to workaround this is to specify the return type of the function called in the lambda. Fabian's answer here explains it well. https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 Tim On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan wrote: > Hi, > I am trying to use the KeyedStream with Tuple to handle diffrent types of > Tuples including Tuple6. > Keep getting the Exception: > *Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: Usage of class > Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, > Tuple2, etc.) instead*. > Is there a way around Type Erasure here ? > I want to use KeyedStream so that I can pass it on to > treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. > > Code below: > > KeyedStream monitoringTupleKeyedStream = null; >> String keyOperationType = ....;//provided >> if (StringUtils.isNotEmpty(keyOperationType)) { >> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) { >> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >> "gameId", "eventName", "component"); >> } else if >> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { >> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >> "gameId", "eventName", "component", "instance"); >> } else if >> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >> TypeInformation> String>> info = TypeInformation.of(new TypeHint> String, String, String, String>>(){}); >> monitoringTupleKeyedStream = kinesisStream.keyBy(new >> KeySelector() { >> public Tuple getKey(Monitoring mon) throws Exception { >> String key = ""; >> String keyName = ""; >> final String eventName = mon.getEventName(); >> if (eventName != null && >> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >> )) { >> keyName = PCAM_ID; >> key = mon.getEventDataMap() != null ? (String) >> mon.getEventDataMap().get(PCAM_ID) : ""; >> } else if (eventName != null && >> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >> keyName = OUT_BITRATE; >> key = mon.getEventDataMap() != null ? (String) >> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >> } >> mon.setKeyName(keyName); >> mon.setKeyValue(key); >> return new Tuple6<>(mon.getDeployment(), mon.getGameId(), >> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue()); >> } >> }); //, info) >> } else if >> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >> monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", >> "gameId", "eventName", "component", "instance", "container"); //<== this is >> also a Tuple6 but no complaints ? >> } >> } > > > > This example below needs monitoringTupleKeyedStream to be > KeyedStream String>> > >> TypeInformation> >> info = TypeInformation.of(new TypeHint> String, String, String>>(){}); >> monitoringTupleKeyedStream = kinesisStream.keyBy(new >> KeySelector> String>>() { >> @Override >> public Tuple6> String> getKey(Monitoring mon) throws Exception { >> String key = ""; >> String keyName = ""; >> //TODO: extract to a method to pull key to use >> from a config file >> final String eventName = mon.getEventName(); >> if (eventName != null && >> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >> )) { >> keyName = PCAM_ID; >> key = mon.getEventDataMap() != null ? >> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >> } else if (eventName != null && >> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >> keyName = OUT_BITRATE; >> key = mon.getEventDataMap() != null ? >> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key >> to use >> } >> mon.setKeyName(keyName); >> mon.setKeyValue(key); >> return new Tuple6<>(mon.getDeployment(), >> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >> mon.getKeyValue()); >> } >> }, info); > > > TIA > --000000000000ee84bb058595dd83 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Flink needs type information for serializing and des= erializing objects, and that is lost due to Java type erasure.=C2=A0 =C2=A0= The only way to workaround this is to specify the return type of the functi= on called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishn= an <bvijaykr@gmail.com> wrote:
Hi,
I am trying to u= se the KeyedStream with Tuple to handle diffrent types of Tuples including = Tuple6.
Keep getting the Exception:
Exception in thr= ead "main" org.apache.flink.api.common.functions.InvalidTypesExce= ption: Usage of class Tuple as a type is not allowed. Use a concrete subcla= ss (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around= Type Erasure here ?
I want to use KeyedStream<Monitoring, Tup= le> so that I can pass it on to treat Tuple6 as a Tuple like the monitor= ingTupleKeyedStream.

Code below:

KeyedStream<Monitor= ing, Tuple> monitoringTupleKeyedStream =3D null;
String keyOperationT= ype =3D ....;//provided=C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0
if (StringUtils= .isNotEmpty(keyOperationType)) {
=C2=A0 =C2=A0 if (keyOperationType.equa= lsIgnoreCase(Utils.COMPONENT_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 m= onitoringTupleKeyedStream =3D kinesisStream.keyBy("deployment", &= quot;gameId", "eventName", "component");
=C2=A0= =C2=A0 } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTAN= CE_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStream = =3D kinesisStream.keyBy("deployment", "gameId", "e= ventName", "component", "instance");
=C2=A0 =C2= =A0 } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERAT= ION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 TypeInformation<Tuple6<String,= String, String, String, String, String>> info =3D TypeInformation.of= (new TypeHint<Tuple6<String, String, String, String, String, String&g= t;>(){});
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStream =3D = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Tuple getKey(Monitoring mon) thro= ws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 S= tring key =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 String keyName =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 final String eventName =3D mon.getEventName= ();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (eventNam= e !=3D null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 )) {
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 keyName =3D PCAM_ID= ;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = key =3D mon.getEventDataMap() !=3D null ? (String) mon.getEventDataMap().ge= t(PCAM_ID) : "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 } else if (eventName !=3D null && (eventName.equalsIgnor= eCase(EGRESS_FPS))) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 keyName =3D OUT_BITRATE;
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap()= !=3D null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""= ; //TODO: identify key to use
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = mon.setKeyName(keyName);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 mon.setKeyValue(key);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 return new Tuple6<>(mon.getDeployment(), mon.getGam= eId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 }); //, info)
=C2=A0 =C2=A0 } else if (keyOperationType.equalsIgnore= Case(COMPONENT_CONTAINER_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monit= oringTupleKeyedStream =3D kinesisStream.keyBy("deployment", "= ;gameId", "eventName", "component", "instance= ", "container"); //<=3D=3D this is also a Tuple6 but no c= omplaints ?
=C2=A0 =C2=A0 }
}


This example below needs monitoringTupleKeyedStream=C2=A0 to be Key= edStream<Monitoring, Tuple6<String, String, String, String, String, S= tring>>=C2=A0
= TypeInformation<Tuple6<String, String, String, String, String, String= >> info =3D TypeInformation.of(new TypeHint<Tuple6<String, Stri= ng, String, String, String, String>>(){});
monitoringTupleKeyedStr= eam =3D kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String= , String, String, String, String, String>>() {
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Tuple= 6<String, String, String, String, String, String> getKey(Monitoring m= on) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 String key =3D "";
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 String keyName =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 //TODO: extract to a metho= d to pull key to use from a config file
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 final String eventName= =3D mon.getEventName();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (eventName !=3D null && (= (eventName.equalsIgnoreCase(INGRESS_FPS)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 )) {
=C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 keyName =3D PCAM_ID;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key =3D mon.= getEventDataMap() !=3D null ? (String) mon.getEventDataMap().get(PCAM_ID) := "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 } else if (eventName !=3D null && (even= tName.equalsIgnoreCase(EGRESS_FPS))) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 keyName = =3D OUT_BITRATE;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap() != =3D null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; = //TODO: identify key to use
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 mon.setKeyName(keyN= ame);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 mon.setKeyValue(key);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return new Tuple6<&= gt;(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mo= n.getKeyName(), mon.getKeyValue());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 }, info);=C2=A0

TIA=C2= =A0
--000000000000ee84bb058595dd83--