flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Balakrishnan <bvija...@gmail.com>
Subject Re: Flink - Type Erasure Exception trying to use Tuple6 instead of Tuple
Date Wed, 01 May 2019 20:39:06 GMT
Hi,
Had asked this questions earlier as topic - "Flink - Type Erasure Exception
trying to use Tuple6 instead of Tuple"

Having issues defining a generic Tuple instead of a specific Tuple1,Tuple2
etc.
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Usage of class
Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
Tuple2, etc.) instead.

DataStream<Map<String, Object>> kinesisStream = ...;
KeyedStream<Map<String, Object>, Tuple> monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet));//<===== complains
about Tuple type for monitoringTupleKeyedStream
.....

public static class MapTupleKeySelector implements KeySelector<Map<String,
Object>, Tuple> {
        private final Set<String> groupBySet;

        public MapTupleKeySelector(Set<String> groupBySet) {
            this.groupBySet = groupBySet;
        }

        @Override
        public Tuple getKey(Map<String, Object> inputMap) throws Exception {
            int groupBySetSize = groupBySet.size();
            Tuple tuple = Tuple.newInstance(groupBySetSize);
            //Tuple1 tuple = new Tuple1();
            int count = 0;
            for (String groupBy : groupBySet) {
tuple.setField(groupByValue, count++);
            }
            return tuple;
        }
    }

Abhishek had replied back in the Thread as follows: (posting in that thread
as well creating a new thread):
However, If you are trying to build some generic framework and for
different streams, there would be different fields, you can follow the Map
approach. For the latter approach, you need to write extra mapper class
which will convert all the fields in the stream to the Map based stream.

Can I get an example of how to create this extra Mapper class ?

Currently, I am using deserialization to convert the incoming byte[] by
implementing KinesisDeserializationSchema<Map<String, Object>> to convert
to a DataStream<Map<String, Object>> kinesisStream.

TIA,

On Sun, Apr 7, 2019 at 5:40 PM abhishek sharma <abhioncbr.apache@gmail.com>
wrote:

> I agree with Timothy, POJO would be a much better approach.
>
> However, If you are trying to build some generic framework and for
> different streams, there would be different fields, you can follow the Map
> approach. For the latter approach, you need to write extra mapper class
> which will convert all the fields in the stream to the Map based stream.
>
> Abhishek
>
> On Sun, Apr 7, 2019 at 3:07 AM Timothy Victor <victtim@gmail.com> wrote:
>
>> Could this just be solved by creating a POJO model class for your problem?
>>
>> That is, instead of using Tuple6 - create a class that encapsulates your
>> data.   This, I think, would solve your problem.  But beyond that I think
>> the code will be more understandable.  It's hard to have a Tuple6 of all
>> Strings, and remember what each one means -- even if I wrote the code :-)
>> Furthermore, if and when you need to add more elements to your data model,
>> you will need to refactor your entire Flink graph.   Keeping a data model
>> in POJO protects against those things.
>>
>> The latter is just unsolicited code review feedback.   And I know I gave
>> it without much context to your problem.  So please take with a large grain
>> of salt, and if it doesn't apply just ignore it.
>>
>> Tim
>>
>>
>> On Fri, Apr 5, 2019 at 7:53 AM Chesnay Schepler <chesnay@apache.org>
>> wrote:
>>
>>> > I tried using  [ keyBy(KeySelector, TypeInformation) ]
>>>
>>> What was the result of this approach?
>>>
>>> On 03/04/2019 17:36, Vijay Balakrishnan wrote:
>>>
>>> Hi Tim,
>>> Thanks for your reply. I am not seeing an option to specify a
>>> .returns(new TypeHint<Tuple6<String,
>>> String,String,String,String,String>>(){}) with KeyedStream ??
>>>
>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>> KeySelector<Monitoring, Tuple>() {
>>>>             public Tuple getKey(Monitoring mon) throws Exception {......return
>>>> new Tuple6<>(..}    })
>>>
>>> I tried using
>>> TypeInformation<Tuple6<String, String, String, String, String, String>>
>>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String,
>>> String, String, String>>(){});
>>>
>>>> kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {...}, info);
>>>> //specify typeInfo through
>>>>
>>>
>>> TIA,
>>> Vijay
>>>
>>> On Tue, Apr 2, 2019 at 6:06 PM Timothy Victor <victtim@gmail.com> wrote:
>>>
>>>> Flink needs type information for serializing and deserializing objects,
>>>> and that is lost due to Java type erasure.   The only way to workaround
>>>> this is to specify the return type of the function called in the lambda.
>>>>
>>>> Fabian's answer here explains it well.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/50945509/apache-flink-return-type-of-function-could-not-be-determined-automatically-due/50947554
>>>>
>>>> Tim
>>>>
>>>> On Tue, Apr 2, 2019, 7:03 PM Vijay Balakrishnan <bvijaykr@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to use the KeyedStream with Tuple to handle diffrent types
>>>>> of Tuples including Tuple6.
>>>>> Keep getting the Exception:
>>>>> *Exception in thread "main"
>>>>> org.apache.flink.api.common.functions.InvalidTypesException: Usage of
class
>>>>> Tuple as a type is not allowed. Use a concrete subclass (e.g. Tuple1,
>>>>> Tuple2, etc.) instead*.
>>>>> Is there a way around Type Erasure here ?
>>>>> I want to use KeyedStream<Monitoring, Tuple> so that I can pass
it on
>>>>> to treat Tuple6 as a Tuple like the monitoringTupleKeyedStream.
>>>>>
>>>>> Code below:
>>>>>
>>>>> KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
>>>>>> String keyOperationType = ....;//provided
>>>>>> if (StringUtils.isNotEmpty(keyOperationType)) {
>>>>>>     if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION))
>>>>>> {
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION))
{
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>> "instance");
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION))
{
>>>>>>         TypeInformation<Tuple6<String, String, String, String,
>>>>>> String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String,
>>>>>> String, String, String, String, String>>(){});
>>>>>>         monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>> KeySelector<Monitoring, Tuple>() {
>>>>>>             public Tuple getKey(Monitoring mon) throws Exception
{
>>>>>>                 String key = "";
>>>>>>                 String keyName = "";
>>>>>>                 final String eventName = mon.getEventName();
>>>>>>                 if (eventName != null &&
>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>                 )) {
>>>>>>                     keyName = PCAM_ID;
>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>> mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>                 } else if (eventName != null &&
>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>                     keyName = OUT_BITRATE;
>>>>>>                     key = mon.getEventDataMap() != null ? (String)
>>>>>> mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key
to use
>>>>>>                 }
>>>>>>                 mon.setKeyName(keyName);
>>>>>>                 mon.setKeyValue(key);
>>>>>>                 return new Tuple6<>(mon.getDeployment(),
>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>> mon.getKeyValue());
>>>>>>             }
>>>>>>         }); //, info)
>>>>>>     } else if
>>>>>> (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION))
{
>>>>>>         monitoringTupleKeyedStream =
>>>>>> kinesisStream.keyBy("deployment", "gameId", "eventName", "component",
>>>>>> "instance", "container"); //<== this is also a Tuple6 but no complaints
?
>>>>>>     }
>>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> This example below needs monitoringTupleKeyedStream  to be
>>>>> KeyedStream<Monitoring, Tuple6<String, String, String, String,
String,
>>>>> String>>
>>>>>
>>>>>> TypeInformation<Tuple6<String, String, String, String, String,
>>>>>> String>> info = TypeInformation.of(new TypeHint<Tuple6<String,
String,
>>>>>> String, String, String, String>>(){});
>>>>>> monitoringTupleKeyedStream = kinesisStream.keyBy(new
>>>>>> KeySelector<Monitoring, Tuple6<String, String, String, String,
String,
>>>>>> String>>() {
>>>>>>                     @Override
>>>>>>                     public Tuple6<String, String, String, String,
>>>>>> String, String> getKey(Monitoring mon) throws Exception {
>>>>>>                         String key = "";
>>>>>>                         String keyName = "";
>>>>>>                         //TODO: extract to a method to pull key to
>>>>>> use from a config file
>>>>>>                         final String eventName = mon.getEventName();
>>>>>>                         if (eventName != null &&
>>>>>> ((eventName.equalsIgnoreCase(INGRESS_FPS)))
>>>>>>                         )) {
>>>>>>                             keyName = PCAM_ID;
>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>> (String) mon.getEventDataMap().get(PCAM_ID) : "";
>>>>>>                         } else if (eventName != null &&
>>>>>> (eventName.equalsIgnoreCase(EGRESS_FPS))) {
>>>>>>                             keyName = OUT_BITRATE;
>>>>>>                             key = mon.getEventDataMap() != null ?
>>>>>> (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify
key
>>>>>> to use
>>>>>>                         }
>>>>>>                         mon.setKeyName(keyName);
>>>>>>                         mon.setKeyValue(key);
>>>>>>                         return new Tuple6<>(mon.getDeployment(),
>>>>>> mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(),
>>>>>> mon.getKeyValue());
>>>>>>                     }
>>>>>>                 }, info);
>>>>>
>>>>>
>>>>> TIA
>>>>>
>>>>
>>>

Mime
View raw message