flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (FLINK-7942) NPE when apply FilterJoinRule
Date Wed, 15 Nov 2017 17:38:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-7942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timo Walther resolved FLINK-7942.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 1.5.0
                   1.4.0

Fixed in 1.5: b6a2dc345f37c4b643789e98d02e23a022d31415
Fixed in 1.4: 13962e1ffda62218031bf426ee9c06146c7c5573

> NPE when apply FilterJoinRule
> -----------------------------
>
>                 Key: FLINK-7942
>                 URL: https://issues.apache.org/jira/browse/FLINK-7942
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: Timo Walther
>             Fix For: 1.4.0, 1.5.0
>
>
> Test case *testFilterRule1* fails due to a NPE 
> {code}
> java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter,
args [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, $3),
'c0')), 'c1'), 0)), rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
$2),joinType=left)]
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> 	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> 	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> 	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
> 	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
> 	at org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
> 	testFilterRule1(FilterRuleTest.scala:63)	
> Caused by: java.lang.NullPointerException
> 	at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
> 	at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
> 	at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
> 	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
> 	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
> 	at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
> 	at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
> 	at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
> 	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> {code}
> but *testFilterRule2* works which has the same query written in SQL.
> {code}
> class FilterRuleTest extends TableTestBase {
>   @Test
>   def testFilterRule1(): Unit = {
>     val util = batchTestUtil()
>     val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
>     val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
>     val results = t1
>       .leftOuterJoin(t2, 'b === 'e)
>       .select('c, Merger('c, 'f) as 'c0)
>       .select(Merger('c, 'c0) as 'c1)
>       .where('c1 >= 0)
>     val expected = unaryNode(
>       "DataSetCalc",
>       binaryNode(
>         "DataSetJoin",
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(0),
>           term("select", "b", "c")
>         ),
>         unaryNode(
>           "DataSetCalc",
>           batchTableNode(1),
>           term("select", "e", "f")
>         ),
>         term("where", "=(b, e)"),
>         term("join", "b", "c", "e", "f"),
>         term("joinType", "LeftOuterJoin")
>       ),
>       term("select", "Merger$(c, Merger$(c, f)) AS c1"),
>       term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
>     )
>     util.verifyTable(results, expected)
>   }
>   @Test
>   def testFilterRule2(): Unit = {
>     val util = batchTestUtil()
>     util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
>     util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
>     util.tableEnv.registerFunction("udf_test", Merger)
>     val sql =
>       s"""
>          |select c1
>          |from (
>          |  select udf_test(c, c0) as c1
>          |  from (
>          |    select c, udf_test(b, c) as c0
>          |      from
>          |      (select a, b, c
>          |        from T1
>          |        left outer join T2
>          |        on T1.b = T2.e
>          |      ) tmp
>          |  ) tmp1
>          |) tmp2
>          |where c1 >= 0
>        """.stripMargin
>     val results = util.tableEnv.sqlQuery(sql)
>     val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" +
>       "DataSetJoin(where=[=(b, e)], join=[b, c, e], joinType=[LeftOuterJoin])\n" +
>       "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 0)])\n" +
>       "DataSetScan(table=[[_DataSetTable_0]])\n" +
>       "DataSetCalc(select=[e])\n" +
>       "DataSetScan(table=[[_DataSetTable_1]])"
>     util.verifyTable(results, expected)
>   }
> }
> object Merger extends ScalarFunction {
>   def eval(f0: Int, f1: Int): Int = {
>     f0 + f1
>   }
> }
> {code}
> A simple way to fix this is to change the calcite class {code} org.apache.calcite.plan.Strong{code}
> add an additional entry to the EnumMap in createPolicyMap method:
> {code}map.put(SqlKind.AS, Policy.AS_IS);{code}
> Either copy to Flink package and modify it  or using reflection somewhere.
> I'm not sure if there exists other issues like this one since not all the types in SQLKind
included in the Strong.MAP.
> @[~fhueske]  @[~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message