flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timo Walther (Jira)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-19112) No access to metric group in ScalarFunction when optimizing
Date Tue, 01 Sep 2020 16:57:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-19112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timo Walther reassigned FLINK-19112:
------------------------------------

    Assignee: Timo Walther

> No access to metric group in ScalarFunction when optimizing
> -----------------------------------------------------------
>
>                 Key: FLINK-19112
>                 URL: https://issues.apache.org/jira/browse/FLINK-19112
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.1
>            Reporter: Nico Kruber
>            Assignee: Timo Walther
>            Priority: Major
>         Attachments: MetricsGroupBug.java
>
>
> Under some circumstances, I cannot access {{context.getMetricGroup()}} in a {{ScalarFunction}}
like this (full job attached):
> {code:java}
>   public static class MyUDF extends ScalarFunction {
>     @Override
>     public void open(FunctionContext context) throws Exception {
>       super.open(context);
>       context.getMetricGroup();
>     }
>     public Integer eval(Integer id) {
>       return id;
>     }
>   }
> {code}
> which leads to this exception:
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: getMetricGroup is
not supported when optimizing
> 	at org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249)
> 	at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57)
> 	at ExpressionReducer$2.open(Unknown Source)
> 	at org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118)
> 	at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696)
> 	at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618)
> 	at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303)
> 	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
> 	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
> 	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> 	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
> 	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> 	at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> 	at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> 	at scala.collection.Iterator.foreach(Iterator.scala:937)
> 	at scala.collection.Iterator.foreach$(Iterator.scala:937)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> 	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> 	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> 	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> 	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> 	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> 	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> 	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84)
> 	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> 	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> 	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
> 	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> 	at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> 	at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50)
> {code}
> I also tried to work around this with a try-catch, assuming that this method is called
once during optimisation and another time at runtime. However, it seems as if {{open()}} is
actually only called once (during optimization) thus giving me no choice to access the metrics
group.
> It seems that removing the where condition before my UDF call also fixes it but it shouldn't
be that way...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message