flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Victor <vict...@gmail.com>
Subject Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple
Date Wed, 03 Apr 2019 01:06:32 GMT
Flink needs type information for serializing and deserializing objects, and
that is lost due to Java type erasure.   The only way to workaround this is
to specify the return type of the function called in the lambda.

Fabian's answer here explains it well.

https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554

Tim

On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvijaykr@gmail.com> wrote:

> Hi,
> I am trying to use the KeyedStream with Tuple to handle diffrent types of
> Tuples including Tuple6.
> Keep getting the Exception:
> *Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
> Tuple2, etc.) instead*.
> Is there a way around Type Erasure here ?
> I want to use KeyedStream<Monitoring, Tuple> so that I can pass it on to
> treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>
> Code below:
>
> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>> String keyOperationType = ....;//provided
>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component");
>>     } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component", "instance");
>>     } else if
>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
>>         TypeInformation<Tuple6<String, String, String, String, String,
>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String,
>> String, String, String, String>>(){});
>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector<Monitoring, Tuple>() {
>>             public Tuple getKey(Monitoring mon) throws Exception {
>>                 String key = "";
>>                 String keyName = "";
>>                 final String eventName = mon.getEventName();
>>                 if (eventName != null &&
>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>                 )) {
>>                     keyName = PCAM_ID;
>>                     key = mon.getEventDataMap() != null ? (String)
>> mon.getEventDataMap().get(PCAM_ID) : "";
>>                 } else if (eventName != null &&
>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>                     keyName = OUT_BITRATE;
>>                     key = mon.getEventDataMap() != null ? (String)
>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
>>                 }
>>                 mon.setKeyName(keyName);
>>                 mon.setKeyValue(key);
>>                 return new Tuple6<>(mon.getDeployment(), mon.getGameId(),
>> eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
>>             }
>>         }); //, info)
>>     } else if
>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
>>         monitoringTupleKeyedStream = kinesisStream.keyBy("deployment",
>> "gameId", "eventName", "component", "instance", "container"); //<== this is
>> also a Tuple6 but no complaints ?
>>     }
>> }
>
>
>
> This example below needs monitoringTupleKeyedStream  to be
> KeyedStream<Monitoring, Tuple6<String, String, String, String, String,
> String>>
>
>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>> String, String, String>>(){});
>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>> KeySelector<Monitoring, Tuple6<String, String, String, String, String,
>> String>>() {
>>                     @Override
>>                     public Tuple6<String, String, String, String, String,
>> String> getKey(Monitoring mon) throws Exception {
>>                         String key = "";
>>                         String keyName = "";
>>                         //TODO: extract to a method to pull key to use
>> from a config file
>>                         final String eventName = mon.getEventName();
>>                         if (eventName != null &&
>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>                         )) {
>>                             keyName = PCAM_ID;
>>                             key = mon.getEventDataMap() != null ?
>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>                         } else if (eventName != null &&
>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>                             keyName = OUT_BITRATE;
>>                             key = mon.getEventDataMap() != null ?
>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key
>> to use
>>                         }
>>                         mon.setKeyName(keyName);
>>                         mon.setKeyValue(key);
>>                         return new Tuple6<>(mon.getDeployment(),
>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>> mon.getKeyValue());
>>                     }
>>                 }, info);
>
>
> TIA
>

Mime
View raw message