From user-return-27478-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed May 1 20:39:23 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3BEE3180629 for ; Wed, 1 May 2019 22:39:23 +0200 (CEST) Received: (qmail 70677 invoked by uid 500); 1 May 2019 20:39:20 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 70667 invoked by uid 99); 1 May 2019 20:39:20 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 May 2019 20:39:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 3030EC2CD3 for ; Wed, 1 May 2019 20:39:20 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.799 X-Spam-Level: * X-Spam-Status: No, score=1.799 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id RIj7PS0odBQ1 for ; Wed, 1 May 2019 20:39:17 +0000 (UTC) Received: from mail-oi1-f196.google.com (mail-oi1-f196.google.com [209.85.167.196]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 93A995F27B for ; Wed, 1 May 2019 20:39:17 +0000 (UTC) Received: by mail-oi1-f196.google.com with SMTP id v10so52413oib.1 for ; Wed, 01 May 2019 13:39:17 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=JOydySUUB8vakTVryNH9ULzTm2+IVcuBgD6HZChtZxE=; b=BOmXQKzpStN2V5b0fvmYg1BhRqNXEpvHUO3l8CaqGZ3wG5ZG9dCESJ0o5MHLu9oZDd aYFqBkFWybvuOeus+p98oFY55Nr+Oibv+87TCk+XRTTl+sR03CziHrrd9bIj9509xqzq /DZG010QRjHzg1DAnC0F8YNd75n+B4gsx20ePGwvNdaB6IuT9z8f8XAAI2OQ4yB53jui DWGR6ROkfKzYeTDc/IWO7UaXuEexlPFxQ9OFrB4vtZlAWoPlasB1hfQzCCnoFCVCeUnl UbWYlbaYV0pj3S5UVpIUfALXyb7YnPlMoQOOOoEyMDVhurT8aVGr8wbZhB3ISClucvNI 8Rkg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=JOydySUUB8vakTVryNH9ULzTm2+IVcuBgD6HZChtZxE=; b=CTD5Vj7P9QqkymztGD/J4SYjskCCVGk2fmyv8KSCkoj5cW9w1mg1UEEw+Yd3DOp6tK ozVqnjH18R1mkEkHVefKBj8LhCWv3ZRd85PnvskUmpRHfhD5/yJoOIYYnbbPrvohVZ7A R2tab9g5gpTd5Yqh4Pgjd/1JqNtYUlCAPk2U7Noen1W8McIMEbFOc4xdY1JO2bN44kQw L7rlFqUNlvPJ5detvg+H4ohXkEPyYEU5I/2l/SivB6SagczznTAa/a1ZP+0saNdwBX74 UoqiIGEwNbps+LRysWqTI3Fgza6pXMMkuvF8VjPlU++Enb2npLHhMwAmyUuWLZGFsoGc kdzQ== X-Gm-Message-State: APjAAAU3/jHPD3ZUusGEWeAypUyeBFayjwf3H5kHJqz0NPSmH5v3QWUa 3fr8vs28/PpwL2FySNEIe4UZh3qXkKNZeYbFBvE= X-Google-Smtp-Source: APXvYqzCiMeChSR/RYG+QUNUUU+2oQoTQXzmR06oyirlXbY7ENcs4LsVzviLbIZhasM9HfuCJoXytvsIBslLL58urGQ= X-Received: by 2002:aca:34d6:: with SMTP id b205mr184616oia.14.1556743156430; Wed, 01 May 2019 13:39:16 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Vijay Balakrishnan Date: Wed, 1 May 2019 13:39:06 -0700 Message-ID: Subject: Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple To: abhishek sharma Cc: Timothy Victor , Chesnay Schepler , user Content-Type: multipart/alternative; boundary="000000000000d8a86d0587d9825a" --000000000000d8a86d0587d9825a Content-Type: text/plain; charset="UTF-8" Hi, Had asked this questions earlier as topic - "Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple" Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2 etc. Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead. DataStream> kinesisStream = ...; KeyedStream, Tuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains about Tuple type for monitoringTupleKeyedStream ..... public static class MapTupleKeySelector implements KeySelector, Tuple> { private final Set groupBySet; public MapTupleKeySelector(Set groupBySet) { this.groupBySet = groupBySet; } @Override public Tuple getKey(Map inputMap) throws Exception { int groupBySetSize = groupBySet.size(); Tuple tuple = Tuple.newInstance(groupBySetSize); //Tuple1 tuple = new Tuple1(); int count = 0; for (String groupBy : groupBySet) { tuple.setField(groupByValue, count++); } return tuple; } } Abhishek had replied back in the Thread as follows: (posting in that thread as well creating a new thread): However, If you are trying to build some generic framework and for different streams, there would be different fields, you can follow the Map approach. For the latter approach, you need to write extra mapper class which will convert all the fields in the stream to the Map based stream. Can I get an example of how to create this extra Mapper class ? Currently, I am using deserialization to convert the incoming byte[] by implementing KinesisDeserializationSchema> to convert to a DataStream> kinesisStream. TIA, On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma wrote: > I agree with Timothy, POJO would be a much better approach. > > However, If you are trying to build some generic framework and for > different streams, there would be different fields, you can follow the Map > approach. For the latter approach, you need to write extra mapper class > which will convert all the fields in the stream to the Map based stream. > > Abhishek > > On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor wrote: > >> Could this just be solved by creating a POJO model class for your problem? >> >> That is, instead of using Tuple6 - create a class that encapsulates your >> data. This, I think, would solve your problem. But beyond that I think >> the code will be more understandable. It's hard to have a Tuple6 of all >> Strings, and remember what each one means -- even if I wrote the code :-) >> Furthermore, if and when you need to add more elements to your data model, >> you will need to refactor your entire Flink graph. Keeping a data model >> in POJO protects against those things. >> >> The latter is just unsolicited code review feedback. And I know I gave >> it without much context to your problem. So please take with a large grain >> of salt, and if it doesn't apply just ignore it. >> >> Tim >> >> >> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler >> wrote: >> >>> > I tried using [ keyBy(KeySelector, TypeInformation) ] >>> >>> What was the result of this approach? >>> >>> On 03/04/2019 17:36, Vijay Balakrishnan wrote: >>> >>> Hi Tim, >>> Thanks for your reply. I am not seeing an option to specify a >>> .returns(new TypeHint>> String,String,String,String,String>>(){}) with KeyedStream ?? >>> >>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>> KeySelector() { >>>> public Tuple getKey(Monitoring mon) throws Exception {......return >>>> new Tuple6<>(..} }) >>> >>> I tried using >>> TypeInformation> >>> info = TypeInformation.of(new TypeHint>> String, String, String>>(){}); >>> >>>> kinesisStream.keyBy(new KeySelector() {...}, info); >>>> //specify typeInfo through >>>> >>> >>> TIA, >>> Vijay >>> >>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor wrote: >>> >>>> Flink needs type information for serializing and deserializing objects, >>>> and that is lost due to Java type erasure. The only way to workaround >>>> this is to specify the return type of the function called in the lambda. >>>> >>>> Fabian's answer here explains it well. >>>> >>>> >>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554 >>>> >>>> Tim >>>> >>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan >>>> wrote: >>>> >>>>> Hi, >>>>> I am trying to use the KeyedStream with Tuple to handle diffrent types >>>>> of Tuples including Tuple6. >>>>> Keep getting the Exception: >>>>> *Exception in thread "main" >>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class >>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, >>>>> Tuple2, etc.) instead*. >>>>> Is there a way around Type Erasure here ? >>>>> I want to use KeyedStream so that I can pass it on >>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream. >>>>> >>>>> Code below: >>>>> >>>>> KeyedStream monitoringTupleKeyedStream = null; >>>>>> String keyOperationType = ....;//provided >>>>>> if (StringUtils.isNotEmpty(keyOperationType)) { >>>>>> if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) >>>>>> { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component"); >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>> "instance"); >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) { >>>>>> TypeInformation>>>>> String, String>> info = TypeInformation.of(new TypeHint>>>>> String, String, String, String, String>>(){}); >>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>> KeySelector() { >>>>>> public Tuple getKey(Monitoring mon) throws Exception { >>>>>> String key = ""; >>>>>> String keyName = ""; >>>>>> final String eventName = mon.getEventName(); >>>>>> if (eventName != null && >>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>> )) { >>>>>> keyName = PCAM_ID; >>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>> mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>> } else if (eventName != null && >>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>> keyName = OUT_BITRATE; >>>>>> key = mon.getEventDataMap() != null ? (String) >>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use >>>>>> } >>>>>> mon.setKeyName(keyName); >>>>>> mon.setKeyValue(key); >>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>> mon.getKeyValue()); >>>>>> } >>>>>> }); //, info) >>>>>> } else if >>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) { >>>>>> monitoringTupleKeyedStream = >>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component", >>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints ? >>>>>> } >>>>>> } >>>>> >>>>> >>>>> >>>>> This example below needs monitoringTupleKeyedStream to be >>>>> KeyedStream>>>> String>> >>>>> >>>>>> TypeInformation>>>>> String>> info = TypeInformation.of(new TypeHint>>>>> String, String, String, String>>(){}); >>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new >>>>>> KeySelector>>>>> String>>() { >>>>>> @Override >>>>>> public Tuple6>>>>> String, String> getKey(Monitoring mon) throws Exception { >>>>>> String key = ""; >>>>>> String keyName = ""; >>>>>> //TODO: extract to a method to pull key to >>>>>> use from a config file >>>>>> final String eventName = mon.getEventName(); >>>>>> if (eventName != null && >>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS))) >>>>>> )) { >>>>>> keyName = PCAM_ID; >>>>>> key = mon.getEventDataMap() != null ? >>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : ""; >>>>>> } else if (eventName != null && >>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) { >>>>>> keyName = OUT_BITRATE; >>>>>> key = mon.getEventDataMap() != null ? >>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key >>>>>> to use >>>>>> } >>>>>> mon.setKeyName(keyName); >>>>>> mon.setKeyValue(key); >>>>>> return new Tuple6<>(mon.getDeployment(), >>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), >>>>>> mon.getKeyValue()); >>>>>> } >>>>>> }, info); >>>>> >>>>> >>>>> TIA >>>>> >>>> >>> --000000000000d8a86d0587d9825a Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,
Had asked this questio= ns earlier as topic - "Flink - Type Erasure Exception trying to use Tu= ple6 instead of Tuple"

Having issues defining= a generic Tuple instead of a specific Tuple1,Tuple2 etc.
Excepti= on in thread "main" org.apache.flink.api.common.functions.Invalid= TypesException: Usage of class Tuple as a type is not allowed. Use a concre= te subclass (e.g. Tuple1, Tuple2, etc.) instead.

D= ataStream<Map<String, Object>> kinesisStream =3D ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedSt= ream =3D kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<=3D= =3D=3D=3D=3D complains about Tuple type for monitoringTupleKeyedStream
.....

public static class MapTupleKeySelecto= r implements KeySelector<Map<String, Object>, Tuple> {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 private final Set<String> groupBySet;

=C2=A0 =C2=A0 =C2=A0 =C2=A0 public MapTupleKeySelec= tor(Set<String> groupBySet) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 this.groupBySet =3D groupBySet;
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 }

=C2=A0 =C2=A0 =C2=A0 =C2=A0 @Override=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 public Tuple getKey(Map<String, O= bject> inputMap) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 int groupBySetSize =3D groupBySet.size();
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Tuple tuple =3D Tuple.newInstance(gr= oupBySetSize);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 //Tuple1= tuple =3D new Tuple1();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 int count =3D 0;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 fo= r (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 return tuple;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2= =A0 =C2=A0 }

Abhishek had replied back in the Thre= ad as follows: (posting in that thread as well creating a new thread):
However, If you are trying to build some generic framework and for di= fferent streams, there would be different fields, you can follow the Map ap= proach. For the latter approach, you need to write extra mapper class which= will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper c= lass ?

Currently, I am using deserialization to co= nvert the incoming byte[] by implementing KinesisDeserializationSchema<M= ap<String, Object>> to convert to a DataStream<Map<String, O= bject>> kinesisStream.=C2=A0

TIA,

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apache@gmail.com> wrote:
<= blockquote class=3D"gmail_quote" style=3D"margin:0px 0px 0px 0.8ex;border-l= eft:1px solid rgb(204,204,204);padding-left:1ex">
I ag= ree with Timothy, POJO would be a much better approach.

=
However, If you are trying to build some generic framework and f= or different streams, there would be different fields, you can follow the M= ap approach. For the latter approach, you need to write extra mapper class = which will convert all the fields in the stream to the Map based stream.

Abhishek

On Sun, Apr 7, 2019 at 3:07 AM Tim= othy Victor <vict= tim@gmail.com> wrote:
Could this just be solved by creating a POJO = model class for your problem?

That is, instead of using = Tuple6 - create a class that encapsulates your data.=C2=A0 =C2=A0This, I th= ink, would solve your problem.=C2=A0 But beyond that I think the code will = be more understandable.=C2=A0 It's hard to have a Tuple6 of all Strings= , and remember what each one means -- even if I wrote the code :-)=C2=A0 Fu= rthermore, if and when you need to add more elements to your data model, yo= u will need to refactor your entire Flink graph.=C2=A0 =C2=A0Keeping a data= model in POJO protects against those things.

The = latter is just unsolicited code review feedback.=C2=A0 =C2=A0And I know I g= ave it without much context to your problem.=C2=A0 So please take with a la= rge grain of salt, and if it doesn't apply just ignore it.
Tim


<= div dir=3D"ltr" class=3D"gmail_attr">On Fri, Apr 5, 2019 at 7:53 AM Chesnay= Schepler <chesn= ay@apache.org> wrote:
=20 =20 =20
> I tried using=C2=A0 [ keyBy(KeySelector, TypeInformation) ]

What was the result of this approach?

On 03/04/2019 17:36, Vijay Balakrishnan wrote:
Hi Tim,
Thanks for your reply. I am not seeing an option to specify a .returns(new TypeHint<Tuple6<String, String,String,String,String,String>>(){}) with KeyedStream ??=C2=A0=C2=A0
monitoringTuple= KeyedStream =3D kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Tuple getKey(M= onitoring mon) throws Exception {......return new Tuple6<>(..}=C2=A0 =C2=A0 })
I tried using=C2=A0=C2=A0
TypeInformation<Tuple6<String, String, String, String, String, String>> info =3D TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});=C2=A0
kinesisStream.k= eyBy(new KeySelector<Monitoring, Tuple>() {...}, info); //specify typeInfo through=C2=A0

TIA,
Vijay=C2=A0

On Tue, Apr 2, 2019 at 6:06 P= M Timothy Victor <victtim@gmail.com> wrote:
Flink needs type information for serializing and deserializing objects, and that is lost due to Java type erasure.=C2=A0 =C2=A0The only way to workaround this is to sp= ecify the return type of the function called in the lambda.

Fabian's answer here explains it well.


Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvijaykr@gmail.com> wrote:
Hi,
I am trying to use the KeyedStream with Tuple to handle diffrent types of Tuples including Tuple6.
Keep getting the Exception:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidType= sException: Usage of class Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1, Tuple2, etc.) instead.
Is there a way around Type Erasure here ?
I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.

Code below:

Key= edStream<Monitoring, Tuple> monitoringTupleKeyedStream =3D null;
String keyOperationType =3D ....;//provided=C2=A0 = =C2=A0 =C2=A0 =C2=A0=C2=A0
if (StringUtils.isNotEmpty(keyOperationType)) {
=C2=A0 =C2=A0 if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_= OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStr= eam =3D kinesisStream.keyBy("deployment", "g= ameId", "eventName", "component");
=C2=A0 =C2=A0 } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_= INSTANCE_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStr= eam =3D kinesisStream.keyBy("deployment", "g= ameId", "eventName", "component", "= ;instance");
=C2=A0 =C2=A0 } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_= KEY_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 TypeInformation<Tupl= e6<String, String, String, String, String, String>> info =3D TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStr= eam =3D kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 public Tu= ple getKey(Monitoring mon) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 String key =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 String keyName =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 final String eventName =3D mon.getEventName();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 if (eventName !=3D null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 )) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 keyName =3D PCAM_ID;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap() !=3D null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 } else if (eventName !=3D null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 keyName =3D OUT_BITRATE;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap() !=3D null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : "&quo= t;; //TODO: identify key to use
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 mon.setKeyName(keyName);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 mon.setKeyValue(key);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }); //, info)
=C2=A0 =C2=A0 } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAI= NER_OPERATION)) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 monitoringTupleKeyedStr= eam =3D kinesisStream.keyBy("deployment", "g= ameId", "eventName", "component", "= ;instance", "container"); //<=3D=3D this is also a= Tuple6 but no complaints ?
=C2=A0 =C2=A0 }
}


This example below needs monitoringTupleKeyedStream=C2=A0 to be KeyedStream<Monitoring, Tuple6<String, String, String, String, String, String>>=C2= =A0
Typ= eInformation<Tuple6<String, String, String, String, String, String>> info =3D TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream =3D kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 String key =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 String keyName =3D "";
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 //TODO: extract to a method to pull key to use from a config file
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 final String eventName =3D mon.getEventName();
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (eventName !=3D null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 )) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 keyName =3D PCAM_ID;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap() !=3D null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";<= br> =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 } else if (eventName !=3D null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 keyName =3D OUT_BITRATE;
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key =3D mon.getEventDataMap() !=3D null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : "&quo= t;; //TODO: identify key to use
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 mon.setKeyName(keyName);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 mon.setKeyValue(key);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 }, info);=C2=A0

TIA=C2=A0


--000000000000d8a86d0587d9825a--