flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dawid Wysakowicz <wysakowicz.da...@gmail.com>
Subject Re: Stream sql example
Date Fri, 09 Jun 2017 21:02:00 GMT
Thanks a lot Timo, after I added the ResultTypeQueryable interface to my
mapper it worked. As for the SongEvent the reason I tried remapping it to
Row is that it has an enum field on which I want to filter, so my first
approach was to remap it in TableSource to String. What do you think should
be the way to go in such case?

After successfully producing DataStream[Row] I tried sth like:

>
> tEnv.toAppendStream(table)(TypeInformation.of(classOf[UserSongsStatistics])).print();
>

The class UserSongsStatistics is a pojo with fields named the same as
expressions in SELECT clause. Is such a construct intended to work? Right
now I get an error:

org.apache.flink.table.api.TableException: The field types of physical and
> logical row types do not match.This is a bug and should not happen. Please
> file an issue.


Is it really a bug?

Anyway thanks for help. I will file a JIRA for the previous issue tomorrow.

Z pozdrowieniami! / Cheers!

Dawid Wysakowicz

*Data/Software Engineer*

Skype: dawid_wys | Twitter: @OneMoreCoder

<http://getindata.com/>

2017-06-09 22:25 GMT+02:00 Timo Walther <twalthr@apache.org>:

> Hi David,
>
> I think the problem is that the type of the DataStream produced by the
> TableSource, does not match the type that is declared in the `
> getReturnType()`. A `MapFunction<xxx, Row>` is always a generic type
> (because Row cannot be analyzed). A solution would be that the mapper
> implements `ResultTypeQueryable`. I agree that the error should be thrown
> earlier, not in the CodeGenerator. Can you create an issue for this?
>
> Btw the Table API supports nested types, it should work that the
> TableSource returns ` SongEvent`.
>
> Regards,
> Timo
>
>
> Am 09.06.17 um 20:19 schrieb Dawid Wysakowicz:
>
> Sorry forgot to add the link:
>
> https://gist.github.com/dawidwys/537d12a6f2355cba728bf93f1af87b45
>
> Z pozdrowieniami! / Cheers!
>
> Dawid Wysakowicz
>
> *Data/Software Engineer*
>
> Skype: dawid_wys | Twitter: @OneMoreCoder
>
> <http://getindata.com/>
>
> 2017-06-09 20:19 GMT+02:00 Dawid Wysakowicz <wysakowicz.dawid@gmail.com>:
>
>> Hi,
>> I tried writing a simple sql query with custom StreamTableSource and it
>> fails with error:
>>
>> org.apache.flink.table.codegen.CodeGenException: Arity of result type
>>>> does not match number of expressions.
>>>
>>> at org.apache.flink.table.codegen.CodeGenerator.generateResultE
>>>> xpression(CodeGenerator.scala:940)
>>>
>>> at org.apache.flink.table.codegen.CodeGenerator.generateConvert
>>>> erResultExpression(CodeGenerator.scala:883)
>>>
>>> at org.apache.flink.table.plan.nodes.CommonScan$class.generated
>>>> ConversionFunction(CommonScan.scala:57)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.generatedConversionFunction(StreamTableSourceScan.scala:35)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamScan$clas
>>>> s.convertToInternalRow(StreamScan.scala:48)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.convertToInternalRow(StreamTableSourceScan.scala:35)
>>>
>>> at org.apache.flink.table.plan.nodes.datastream.StreamTableSour
>>>> ceScan.translateToPlan(StreamTableSourceScan.scala:107)
>>>
>>>
>> You can check the source code here:
>>
>>
>> Z pozdrowieniami! / Cheers!
>>
>> Dawid Wysakowicz
>>
>> *Data/Software Engineer*
>>
>> Skype: dawid_wys | Twitter: @OneMoreCoder
>>
>> <http://getindata.com/>
>>
>
>
>

Mime
View raw message