flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rong Rong <walter...@gmail.com>
Subject Re: type error with generics ..
Date Sun, 25 Aug 2019 18:13:37 GMT
I am not sure how the function `readStream` is implemented (also which
version of Flink are you using?).
Can you share more information on your code blocks and exception logs?

Also to answer your question, DataStream return type is determined by its
underlying transformation, so you cannot set it directly.

Thanks,
Rong

On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Thanks .. I tried this ..
>
> DataStream<Data> ins = readStream(in, Data.class, serdeData).map((Data d)
> -> d).returns(new TypeHint<Data>(){}.getTypeInfo());
>
> But still get the same error on this line ..
>
> (BTW I am not sure how to invoke returns on a DataStream and hence had to
> do a fake map - any suggestions here ?)
>
> regards.
>
> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong <walterddr@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> I think the error refers to the output of your source instead of your
>> result of the map function. E.g.
>> DataStream<Data> ins = readStream(in, Data.class, serdeData)*.returns(new
>> TypeInformation<Data>);*
>> DataStream<Simple> simples = ins.map((Data d) -> new Simple(d.getName()))
>> .returns(new TypeHint<Simple>(){}.getTypeInfo());
>>
>> --
>> Rong
>>
>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <ghosh.debasish@gmail.com>
>> wrote:
>>
>>> Hello -
>>>
>>> I have the following call to addSource where I pass a Custom
>>> SourceFunction ..
>>>
>>> env.<Data>addSource(
>>>   new CollectionSourceFunctionJ<Data>(data, TypeInformation.<Data>of(new
>>> TypeHint<Data>(){}))
>>> )
>>>
>>> where data is List<Data> and CollectionSourceFunctionJ is a Scala case
>>> class ..
>>>
>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>> TypeInformation[T]) extends SourceFunction[T] {
>>>   def cancel(): Unit = {}
>>>   def run(ctx: SourceContext[T]): Unit = {
>>>     data.asScala.foreach(d ⇒ ctx.collect(d))
>>>   }
>>> }
>>>
>>> When the following transformation runs ..
>>>
>>> DataStream<Data> ins = readStream(in, Data.class, serdeData);
>>> DataStream<Simple> simples = ins.map((Data d) -> new
>>> Simple(d.getName())).returns(new TypeHint<Simple>(){}.getTypeInfo());
>>>
>>> I get the following exception in the second line ..
>>>
>>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>>>> type of function 'Custom Source' could not be determined automatically, due
>>>> to type erasure. You can give type information hints by using the
>>>> returns(...) method on the result of the transformation call, or by letting
>>>> your function implement the 'ResultTypeQueryable' interface.
>>>
>>>
>>> Initially the returns call was not there and I was getting the same
>>> exception. Now after adding the returns call, nothing changes.
>>>
>>> Any help will be appreciated ..
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Mime
View raw message