flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <jacopo.go...@ubs.com>
Subject RE: Flink's Either type information
Date Wed, 04 Mar 2020 15:08:05 GMT
Hi all,

Yes my problem is that I do not create the function inline but create a function directly
when creating the data stream job.
My code (which I cannot share) is exactly like your example, Yun, are you aware if there is
a way to prevent code erasure?

Kind regards,

Jacopo Gobbi


From: Yun Gao [mailto:yungao.gy@aliyun.com]
Sent: Freitag, 21. Februar 2020 16:00
To: Robert Metzger; Gobbi, Jacopo-XT
Cc: user
Subject: [External] Re: Flink's Either type information

      Hi Jacopo, Robert,

         Very sorry for missing the previous email and not response in time. I think exactly
as Robert has pointed out with the example: using inline anonymous subclass of KeyedBroadcastProcessFunction
should not cause the problem. As far as I know, the possible reason that cause the attached
exception might be that the parameter types of Either get erased due to the way to create
KeyedBroadcastProcessFunction object. For example, if you first implement a generic subclass
of KeyedBroadcastProcessFunction like:

      public class MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType> extends
KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, Either<MyLeftType,
MyRightType>> { ... }

     and create a function object directly when constructing the DataStream job:

     stream.process(new MyKeyedBroadcastProcessFunction<MyLeftType, MyRightType>());

     Then MyLeftType and MyRightType will be erased and will cause the attached exception
when Flink tries to inference the output type.

     And I totally agree with Robert that attaching the corresponding codes would help debugging
the problem.

  Yours,
    Yun


------------------------------------------------------------------
From:Robert Metzger <rmetzger@apache.org>
Send Time:2020 Feb. 21 (Fri.) 19:47
To:jacopo.gobbi <jacopo.gobbi@ubs.com>
Cc:yungao.gy <yungao.gy@aliyun.com>; user <user@flink.apache.org>
Subject:Re: Flink's Either type information

Hey Jacopo,
can you post an example to reproduce the issue? I've tried it, but it worked in this artificial
example:


MapStateDescriptor<String, String> state = new MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
DataStream<Either<Integer, String>> result = input
      .map((MapFunction<String, Tuple2<Integer, String>>) value -> Tuple2.of(0,
value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class))
      .keyBy(0).connect(input.broadcast(state))
      .process(new KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>,
String, Either<Integer, String>>() {
         @Override
         public void processElement(Tuple2<Integer, String> value, ReadOnlyContext ctx,
Collector<Either<Integer, String>> out) throws Exception {
            out.collect(Either.Left(111));
         }
         @Override
         public void processBroadcastElement(String value, Context ctx, Collector<Either<Integer,
String>> out) throws Exception { }
      });
result.print();

On Wed, Feb 19, 2020 at 6:07 PM <jacopo.gobbi@ubs.com<mailto:jacopo.gobbi@ubs.com>>
wrote:
Yes, I create it the way you mentioned.

From: Yun Gao [mailto:yungao.gy@aliyun.com<mailto:yungao.gy@aliyun.com>]
Sent: Dienstag, 18. Februar 2020 10:12
To: Gobbi, Jacopo-XT; user
Subject: [External] Re: Flink's Either type information

      Hi Jacopo,

          Could you also provide how the KeyedBroadcastProcessFunction is created when constructing
datastream API ? For example, are you using something like

          new KeyedBroadcastProcessFunction<Integer, Integer, Integer, Either<MyLeft,
MyRight>() {
                       // Function implementation
             }

             or something else?

     Best,
      Yun


------------------------------------------------------------------
From:jacopo.gobbi <jacopo.gobbi@ubs.com<mailto:jacopo.gobbi@ubs.com>>
Send Time:2020 Feb. 17 (Mon.) 18:31
To:user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject:Flink's Either type information

Hi all,

How can an Either value be returned by a KeyedBroadcastProcessFunction?
We keep getting "InvalidTypesException: Type extraction is not possible on Either type as
it does not contain information about the 'left' type." when doing: out.collect(Either.<MyLeftType,
MyRightType>Right(myObject));

Thanks,

Jacopo Gobbi



Mime
View raw message