flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hequn Cheng <chenghe...@gmail.com>
Subject Re: Add Logical filter on a query plan from Flink Table API
Date Tue, 09 Jul 2019 03:05:29 GMT
Hi Felipe,

> I would like to create a logical filter if there is no filter set on the
logical query. How should I implement it?
Do you mean you want to add a LogicalFilter node if the query even doesn't
contain filter? Logically, this can be done through a rule. However, it
sounds a little hack and you have to pay attention to semantic problems.
One thing you have to notice is that you can't change the RowType when you
perform your rules, i.e., for NodeA -> rule -> NodeB, NodeB should contain
the same row type with NodeA.
There are a lot of rules in Flink which I think is a good example for you.
You can find these rules in the class of `FlinkRuleSets`.

> I see my LogicalFilter been created when I call "tableEnv.explain()"
method. I suppose that I can add some logical filters on the plan.
The `LogicalFilter` and `DataStreamCalc` is not created by your Filter
rule. If you remove your filter rule, there is nothing change for the plan.

Best, Hequn

On Mon, Jul 8, 2019 at 11:13 PM Felipe Gutierrez <
felipe.o.gutierrez@gmail.com> wrote:

> Hi,
>
> I am a newbie in Apache Calcite. I am trying to use it with Apache Flink.
> To start I am trying to create a HelloWorld which just add a logical filter
> on my query.
> 1 - I have my Flink app using Table API [1].
> 2 - I have created my Calcite filter rule which is applied to my FLink
> query if I use CalciteConfig cc = new
> CalciteConfigBuilder().addLogicalOptRuleSet(RuleSets.ofList(MyFilterRule.INSTANCE)).build()
> [2];
> 3 - The debug thread only goes to my rule if there is a filter on my query.
>
> I would like to create a logical filter if there is no filter set on the
> logical query. How should I implement it?
> I see my LogicalFilter been created when I call "tableEnv.explain()"
> method. I suppose that I can add some logical filters on the plan.
>
> == Abstract Syntax Tree ==
> LogicalFilter(condition=[>=($6, 50)])
>   LogicalTableScan(table=[[TicketsStation01Plat01]])
>
> == Optimized Logical Plan ==
> DataStreamCalc(select=[sensorId, sensorType, platformId, platformType,
> stationId, timestamp, value, trip, eventTime], where=[>=(value, 50)])
>   StreamTableSourceScan(table=[[TicketsStation01Plat01]],
> fields=[sensorId, sensorType, platformId, platformType, stationId,
> timestamp, value, trip, eventTime], source=[SensorTuples])
>
> == Physical Execution Plan ==
> ....
>
> Thanks,
> Felipe
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/table/HelloWorldCalcitePlanTableAPI.java#L62
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/calcite/rules/MyFilterRule.java#L14
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>

Mime
View raw message