flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Streaming User-defined Aggregate Function gives exception when using Table API jars
Date Wed, 15 Nov 2017 09:00:36 GMT
Hi Colin,

thanks for reporting the bug. I had a look at it and it seems that the
wrong classloader is used when compiling the code (both for the batch as
well as the streaming queries).
I have a fix that I need to verify.

It's not necessary to open a new JIRA for that. We can cover all cases
under FLINK-7490.

Thanks, Fabian

2017-11-15 5:32 GMT+01:00 Colin Williams <colin.williams.seattle@gmail.com>:

> From the documentation there is a note which instructs not to include the
> flink-table dependency into the project. However when I put the flink-table
> dependency on the cluster the User-defined Aggregate Function gives an
> Exception.
>
> When I do include the flink-table into the dependencies, the project runs
> just fine. However I'd expect that there will then be garbage collection
> issues.
>
> This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
> where I made a comment. I believe the issue is likely related to the
> classloading as suggested, but the related classes are different (Batch vs
> Stream).
>
> Should another bug report be filed?
>
> Also that bug report hasn't really had any activity and it's been a few
> months.
>
> Best Regards,
>
> Colin Williams
>
>
> java.io.IOException: Exception while applying AggregateFunction in
> aggregating state
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:91)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:442)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:206)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:36)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.compile(
> AggregateAggFunction.scala:33)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> initFunction(AggregateAggFunction.scala:72)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> createAccumulator(AggregateAggFunction.scala:41)
> at org.apache.flink.table.runtime.aggregate.AggregateAggFunction.
> createAccumulator(AggregateAggFunction.scala:33)
> at org.apache.flink.runtime.state.heap.HeapAggregatingState$
> AggregateTransformation.apply(HeapAggregatingState.java:115)
> at org.apache.flink.runtime.state.heap.NestedMapsStateTable.transform(
> NestedMapsStateTable.java:298)
> at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
> HeapAggregatingState.java:89)
> ... 6 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 6, Column
> 14: Cannot determine simple type name "com"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6416)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6177)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getReferenceType(
> UnitCompiler.java:6190)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
> at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(
> UnitCompiler.java:6064)
> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(
> UnitCompiler.java:6059)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
> at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
> at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299)
> at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118)
> at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.
> java:4053)
> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(UnitCompiler.
> java:4048)
> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048)
> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4044)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4109)
> at org.codehaus.janino.UnitCompiler.access$6600(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(
> UnitCompiler.java:4051)
> at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(
> UnitCompiler.java:4048)
> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4048)
> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler.java:4044)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
> at org.codehaus.janino.UnitCompiler.compileGetValue(
> UnitCompiler.java:5224)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4667)
> at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4066)
> at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4044)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4699)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
> at org.codehaus.janino.UnitCompiler.compileGetValue(
> UnitCompiler.java:5224)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2536)
> at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS
> tatement(UnitCompiler.java:1459)
> at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationS
> tatement(UnitCompiler.java:1443)
> at org.codehaus.janino.Java$LocalVariableDeclarationStatem
> ent.accept(Java.java:3348)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
> at org.codehaus.janino.UnitCompiler.compileStatements(
> UnitCompiler.java:1523)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
> UnitCompiler.java:1313)
> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
> UnitCompiler.java:1286)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclara
> tion(UnitCompiler.java:390)
> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclara
> tion(UnitCompiler.java:385)
> at org.codehaus.janino.Java$PackageMemberClassDeclaration.
> accept(Java.java:1405)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
> at org.codehaus.janino.SimpleCompiler.compileToClassLoader(
> SimpleCompiler.java:446)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at org.apache.flink.table.codegen.Compiler$class.
> compile(Compiler.scala:33)
> ... 13 more
>
>
> *Note:* Due to an issue in Apache Calcite, which prevents the user
> classloaders from being garbage-collected, we do *not* recommend building
> a fat-jar that includes the flink-table dependency. Instead, we recommend
> configuring Flink to include the flink-table dependency in the system
> classloader. This can be done by copying the flink-table.jar file from
> the ./opt folder to the ./lib folder. See these instructions
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html>
> for further details.
>
>
>
>
>

Mime
View raw message