flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunfulin <sunfulin0...@163.com>
Subject Re:Re: Flink SQL job failed to submit with enableCheckpointing while SQL contains UDF
Date Sun, 01 Mar 2020 10:04:40 GMT
Hi, Benchao,
Thanks for the reply. 


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
I am using Blink Planner. Not test with legacy planner because my program depend a lot of
new feature based on blink planner.
2. how do you register your UDF?
Just use the code :  tableEnv.registerFunction ("ts2Date", new ts2Date());    tableEnv is
a StreamTableEnvironment.
3. does this has a relation with checkpointing? what if you enable checkpointing and not use
your udf? and disable checkpointing and use udf?
I don't think this is related with checkpoint. If I enable checkpointing and not use my udf,
I did not see any exception and submit job successfully. If I disable checkpointing and use
udf, the job can submit successfully too. 


I dive a lot with this exception. Maybe it is related with some classloader issue. Hope for
your suggestion. 











在 2020-03-01 17:54:03,"Benchao Li" <libenchao@gmail.com> 写道:

Hi fulin,


It seems like a bug in the code generation.


Could you provide us more information?
1. what planner are you using? blink or legacy planner?
2. how do you register your UDF?
3. does this has a relation with checkpointing? what if you enable checkpointing and not use
your udf? and disable checkpointing and use udf?


sunfulin <sunfulin0321@163.com> 于2020年3月1日周日 下午5:41写道:

Hi, guys
I am running Flink 1.10 SQL job with UpsertStreamSink to ElasticSearch. In my sql logic, I
am using a UDF like ts2Date to handle date format stream fields. However, when I add the `env.enableCheckpointing(time)`,
my job failed to submit and throws exception like following. This is really weird, cause when
I remove the UDF, the job can submit successfully. Any suggestion is highly appreciated. Besides,
my sql logic is like : 



INSERT INTO realtime_product_sell
select U.sor_pty_id,
       U.entrust_date,
       U.entrust_time,
       U.product_code,
       U.business_type,
       sum(cast(U.balance as double)) as balance,
       COALESCE(C.cust_name, '--') as cust_name,
       COALESCE(C.open_comp_name, '--') AS open_comp_name,
       COALESCE(C.open_comp_id, '--') as open_comp_id,
       COALESCE(C.org_name,'--') as org_name,
       COALESCE(C.org_id,'--') as comp_name,
       COALESCE(C.comp_name, '--') AS comp_name,
       COALESCE(C.comp_id,'--') as comp_id,
       COALESCE(C.mng_name,'--') as mng_name,
       COALESCE(C.mng_id,'--') as mng_id,
       COALESCE(C.is_tg,'--') as is_tg,
       COALESCE(C.cust_type,'--') as cust_type,
       COALESCE(C.avg_tot_aset_y365, 0.00) as avg_tot_aset_y365,
       COALESCE(C.avg_aset_create_y, 0.00) as avg_aset_create_y
from
(select customerNumber as sor_pty_id,
        ts2Date(`lastUpdateTime`, true) as entrust_date,     -- the UDF 
       ts2Date(`lastUpdateTime`, false) as entrust_time,     -- the UDF
        fundCode as product_code,
        businessType as business_type,
        balance,
        proctime
      from lscsp_sc_order_all where fundCode in ('007118','007117') and businessType in ('5')
) as U
left join
dim_app_cust_info FOR SYSTEM_TIME AS OF U.proctime AS C
on U.sor_pty_id = C.cust_id
group by sor_pty_id,
        entrust_date,
        entrust_time,
        product_code,
        business_type,
        COALESCE(C.cust_name, '--'),
        COALESCE(C.open_comp_name, '--'),
        COALESCE(C.open_comp_id, '--'),
        COALESCE(C.org_name,'--'),
        COALESCE(C.org_id,'--'),
        COALESCE(C.comp_name, '--'),
        COALESCE(C.comp_id,'--'),
        COALESCE(C.mng_name,'--'),
        COALESCE(C.mng_id,'--'),
        COALESCE(C.is_tg,'--'),
        COALESCE(C.cust_type,'--'),
        COALESCE(C.avg_tot_aset_y365, 0.00),
        COALESCE(C.avg_aset_create_y, 0.00)




2020-03-01 17:22:06,504 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler 
  - Unhandled exception.
org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException:
Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This
is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 16 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled.
This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 19 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 30: Cannot determine
simple type name "com"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$24.getType(UnitCompiler.java:8184)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6786)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6412)
at org.codehaus.janino.UnitCompiler$21$2$1.visitFieldAccess(UnitCompiler.java:6407)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4299)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
at org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
at org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)
at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)
at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 25 more






 





--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message