flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Does Flink SQL "in" operation has length limit?
Date Mon, 01 Oct 2018 08:01:38 GMT
Hi,

tuple should not be used anywhere in flink-table. @Rong can you point us 
to the corresponding code? I haven't looked into the code but we should 
definitely support this query. @Henry feel free to open an issue for it.

Regards,
Timo


Am 28.09.18 um 19:14 schrieb Rong Rong:
> Yes.
>
> Thanks for bringing this up Hequn! :-) I think Tuple would not be the 
> best container to use.
>
> However, in search for alternative, shouldn't Collection / List be a 
> more suitable solution? Row seems to not fit in the context (as there 
> can be Rows with elements of different type).
> I vaguely recall there was similar JIRA but might not be related to IN 
> clause. Let me try to dig it up.
>
> --
> Rong
>
> On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng <chenghequn@gmail.com 
> <mailto:chenghequn@gmail.com>> wrote:
>
>     Hi,
>
>     I haven't look into the code. If this is limited by Tuple, would
>     it better to implement it with Row?
>
>     Best, Hequn
>
>     On Fri, Sep 28, 2018 at 9:27 PM Rong Rong <walterddr@gmail.com
>     <mailto:walterddr@gmail.com>> wrote:
>
>         Hi Henry, Vino.
>
>         I think IN operator was translated into either a RexSubQuery
>         or a SqlStdOperatorTable.IN operator.
>         I think Vino was referring to the first case.
>         For the second case (I think that's what you are facing here),
>         they are converted into tuples and the maximum we currently
>         have in Flink was Tuple25.java, I was wondering if that was
>         the issue you are facing. You can probably split the IN into
>         many IN combining with OR.
>
>         --
>         Rong
>
>         On Fri, Sep 28, 2018 at 2:33 AM vino yang
>         <yanghua1127@gmail.com <mailto:yanghua1127@gmail.com>> wrote:
>
>             Hi Henry,
>
>             Maybe the number of elements in your IN clause is out of
>             range? Its default value is 20, you can modify it with
>             this configuration item:
>
>             /*withInSubQueryThreshold(XXX)*/
>
>             This API comes from Calcite.
>
>             Thanks, vino.
>
>             徐涛 <happydexutao@gmail.com
>             <mailto:happydexutao@gmail.com>> 于2018年9月28日周五
>             下午4:23写道:
>
>                 Hi,
>
>                      When I am executing the following SQL in flink 1.6.1, some error
throws out saying that it has a support issue, but when I reduce the number of integers in
the “in” sentence, for example,
>
>                 trackId in (124427150,71648998) , Flink does not
>                 complain anything, so I wonder is there any length
>                 limit in “in”operation?
>
>                 Thanks a lot.
>
>                 SELECT
>                      trackId as id,track_title as description, count(*) as cnt
>                 FROM
>                      play
>                 WHERE
>                      appName='play.statistics.trace' and
>                      trackId in (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
>                 GROUP BY
>                      HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),trackId,track_title;
>
>
>
>                 FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>                 FlinkLogicalCalc(expr#0..3=[{inputs}],
>                 started_at_ts=[$t2], trackId=[$t0], track_title=[$t1])
>                     FlinkLogicalJoin(condition=[=($0, $3)],
>                 joinType=[inner])
>                 FlinkLogicalCalc(expr#0..4=[{inputs}],
>                 expr#5=[_UTF-16LE'play.statistics.trace'],
>                 expr#6=[=($t0, $t5)], trackId=[$t1],
>                 track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
>                 FlinkLogicalNativeTableScan(table=[[play]])
>                       FlinkLogicalValues(tuples=[[{ 124427150 }, {
>                 71648998 }, { 124493327 }, { 524043 }, { 27300837 }, {
>                 30300481 }, { 27300809 }, { 124744768 }, { 45982512 },
>                 { 124526566 }, { 124556427 }, { 124804208 }, {
>                 74302264 }, { 119588973 }, { 30496269 }, { 27300288 },
>                 { 124098818 }, { 125071530 }, { 120918746 }, {
>                 124171456 }, { 30413034 }, { 124888075 }, { 125270551
>                 }, { 125434224 }, { 27300195 }, { 45982342 }, {
>                 45982468 }, { 45982355 }, { 65349883 }, { 124705962 },
>                 { 65349905 }, { 124298305 }, { 124889583 }, { 45982338
>                 }, { 20506255 }, { 18556415 }, { 122161128 }, {
>                 27299018 }, { 122850375 }, { 124862362 }, { 45982336
>                 }, { 59613202 }, { 122991190 }, { 124590280 }, {
>                 124867563 }, { 45982332 }, { 124515944 }, { 20506257
>                 }, { 122572115 }, { 92083574 }]])
>
>                 This exception indicates that the query uses an
>                 unsupported SQL feature.
>                 Please check the documentation for the set of
>                 currently supported SQL features.
>                 at
>                 org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
>                 at
>                 org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
>                 at
>                 org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
>                 at
>                 org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>                 at
>                 org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>                 at
>                 org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>                 at
>                 org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>                 at
>                 com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
>                 at
>                 com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
>                 at
>                 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>                 at
>                 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>                 at
>                 com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
>                 at
>                 com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
>                 at
>                 com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
>                 at
>                 com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)
>
>                 Best
>                 Henry
>


Mime
View raw message