flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lincoln.lee (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-7942) NPE when apply FilterJoinRule
Date Mon, 30 Oct 2017 06:48:02 GMT
lincoln.lee created FLINK-7942:
----------------------------------

             Summary: NPE when apply FilterJoinRule
                 Key: FLINK-7942
                 URL: https://issues.apache.org/jira/browse/FLINK-7942
             Project: Flink
          Issue Type: Bug
            Reporter: lincoln.lee
            Assignee: lincoln.lee


Test case *testFilterRule1* fails due to a NPE 
{code}
java.lang.RuntimeException: Error while applying rule FilterJoinRule:FilterJoinRule:filter,
args [rel#148:LogicalFilter.NONE(input=rel#146:Subset#12.NONE,condition=>=(AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1,
AS(org$apache$flink$table$api$batch$table$Merger$$773bb962ee701f47b08bc74058c46bb3($1, $3),
'c0')), 'c1'), 0)), rel#145:LogicalJoin.NONE(left=rel#143:Subset#10.NONE,right=rel#144:Subset#11.NONE,condition==($0,
$2),joinType=left)]

	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
	at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
	at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
	at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:270)
	at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:347)
	at org.apache.flink.table.utils.BatchTableTestUtil.verifyTable(TableTestBase.scala:186)
	at org.apache.flink.table.api.batch.table.FilterRuleITCase.testFilterRule1(FilterRuleTest.scala:63)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.NullPointerException
	at org.apache.calcite.plan.Strong.isNull(Strong.java:110)
	at org.apache.calcite.plan.Strong.anyNull(Strong.java:166)
	at org.apache.calcite.plan.Strong.isNull(Strong.java:114)
	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:99)
	at org.apache.calcite.plan.Strong.isNotTrue(Strong.java:84)
	at org.apache.calcite.plan.RelOptUtil.simplifyJoin(RelOptUtil.java:2354)
	at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:149)
	at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:348)
	at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}
but *testFilterRule2* works which has the same query written in SQL.

{code}
class FilterRuleTest extends TableTestBase {

  @Test
  def testFilterRule1(): Unit = {
    val util = batchTestUtil()
    val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
    val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
    val results = t1
      .leftOuterJoin(t2, 'b === 'e)
      .select('c, Merger('c, 'f) as 'c0)
      .select(Merger('c, 'c0) as 'c1)
      .where('c1 >= 0)

    val expected = unaryNode(
      "DataSetCalc",
      binaryNode(
        "DataSetJoin",
        unaryNode(
          "DataSetCalc",
          batchTableNode(0),
          term("select", "b", "c")
        ),
        unaryNode(
          "DataSetCalc",
          batchTableNode(1),
          term("select", "e", "f")
        ),
        term("where", "=(b, e)"),
        term("join", "b", "c", "e", "f"),
        term("joinType", "LeftOuterJoin")
      ),
      term("select", "Merger$(c, Merger$(c, f)) AS c1"),
      term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
    )

    util.verifyTable(results, expected)
  }

  @Test
  def testFilterRule2(): Unit = {
    val util = batchTestUtil()
    util.addTable[(String, Int, Int)]("T1", 'a, 'b, 'c)
    util.addTable[(String, Int, Int)]("T2", 'd, 'e, 'f)
    util.tableEnv.registerFunction("udf_test", Merger)

    val sql =
      s"""
         |select c1
         |from (
         |  select udf_test(c, c0) as c1
         |  from (
         |    select c, udf_test(b, c) as c0
         |      from
         |      (select a, b, c
         |        from T1
         |        left outer join T2
         |        on T1.b = T2.e
         |      ) tmp
         |  ) tmp1
         |) tmp2
         |where c1 >= 0
       """.stripMargin

    val results = util.tableEnv.sqlQuery(sql)
    val expected = "DataSetCalc(select=[udf_test(c, udf_test(b, c)) AS c1]) \n" +
      "DataSetJoin(where=[=(b, e)], join=[b, c, e], joinType=[LeftOuterJoin])\n" +
      "DataSetCalc(select=[b, c], where=[>=(udf_test(c, udf_test(b, c)), 0)])\n" +
      "DataSetScan(table=[[_DataSetTable_0]])\n" +
      "DataSetCalc(select=[e])\n" +
      "DataSetScan(table=[[_DataSetTable_1]])"
    util.verifyTable(results, expected)
  }
}

object Merger extends ScalarFunction {
  def eval(f0: Int, f1: Int): Int = {
    f0 + f1
  }
}
{code}

A simple way to fix this is to change the calcite class {code} org.apache.calcite.plan.Strong{code}
add an additional entry to the EnumMap in createPolicyMap method:
{code}map.put(SqlKind.AS, Policy.AS_IS);{code}
Either copy to Flink package and modify it  or using reflection somewhere.

I'm not sure if there exists other issues like this one since not all the types in SQLKind
included in the Strong.MAP.
[~fhueske]  [~twalthr] any ideas?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message