flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Colin Williams <colin.williams.seat...@gmail.com>
Subject Re: Streaming User-defined Aggregate Function gives exception when using Table API jars
Date Thu, 16 Nov 2017 05:39:04 GMT
Thank you Fabian for fixing that. The highlight of my day today.


On Nov 15, 2017 1:01 AM, "Fabian Hueske" <fhueske@gmail.com> wrote:

> Hi Colin,
>
> thanks for reporting the bug. I had a look at it and it seems that the
> wrong classloader is used when compiling the code (both for the batch as
> well as the streaming queries).
> I have a fix that I need to verify.
>
> It's not necessary to open a new JIRA for that. We can cover all cases
> under FLINK-7490.
>
> Thanks, Fabian
>
> 2017-11-15 5:32 GMT+01:00 Colin Williams <colin.williams.seattle@gmail.com
> >:
>
>> From the documentation there is a note which instructs not to include the
>> flink-table dependency into the project. However when I put the flink-table
>> dependency on the cluster the User-defined Aggregate Function gives an
>> Exception.
>>
>> When I do include the flink-table into the dependencies, the project runs
>> just fine. However I'd expect that there will then be garbage collection
>> issues.
>>
>> This seems similar to https://issues.apache.org/jira/browse/FLINK-7490,
>> where I made a comment. I believe the issue is likely related to the
>> classloading as suggested, but the related classes are different (Batch vs
>> Stream).
>>
>> Should another bug report be filed?
>>
>> Also that bug report hasn't really had any activity and it's been a few
>> months.
>>
>> Best Regards,
>>
>> Colin Williams
>>
>>
>> java.io.IOException: Exception while applying AggregateFunction in
>> aggregating state
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:91)
>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:442)
>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:206)
>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:69)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:263)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:36)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.compile(AggregateAggFunction.scala:33)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.initFunction(AggregateAggFunction.scala:72)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:41)
>> at org.apache.flink.table.runtime.aggregate.AggregateAggFunctio
>> n.createAccumulator(AggregateAggFunction.scala:33)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState$Agg
>> regateTransformation.apply(HeapAggregatingState.java:115)
>> at org.apache.flink.runtime.state.heap.NestedMapsStateTable.tra
>> nsform(NestedMapsStateTable.java:298)
>> at org.apache.flink.runtime.state.heap.HeapAggregatingState.
>> add(HeapAggregatingState.java:89)
>> ... 6 more
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 6,
>> Column 14: Cannot determine simple type name "com"
>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6416)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6177)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompil
>> er.java:6190)
>> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
>> at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6064)
>> at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(Uni
>> tCompiler.java:6059)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
>> at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
>> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
>> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
>> at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
>> at org.codehaus.janino.IClass$IField.getDescriptor(IClass.java:1299)
>> at org.codehaus.janino.UnitCompiler.getfield(UnitCompiler.java:11439)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4118)
>> at org.codehaus.janino.UnitCompiler.access$6800(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(
>> UnitCompiler.java:4053)
>> at org.codehaus.janino.UnitCompiler$12$1.visitFieldAccess(
>> UnitCompiler.java:4048)
>> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
>> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler
>> .java:4048)
>> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler
>> .java:4044)
>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4109)
>> at org.codehaus.janino.UnitCompiler.access$6600(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(Uni
>> tCompiler.java:4051)
>> at org.codehaus.janino.UnitCompiler$12$1.visitAmbiguousName(Uni
>> tCompiler.java:4048)
>> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
>> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler
>> .java:4048)
>> at org.codehaus.janino.UnitCompiler$12.visitLvalue(UnitCompiler
>> .java:4044)
>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
>> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompile
>> r.java:5224)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4667)
>> at org.codehaus.janino.UnitCompiler.access$7700(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4066)
>> at org.codehaus.janino.UnitCompiler$12.visitCast(UnitCompiler.java:4044)
>> at org.codehaus.janino.Java$Cast.accept(Java.java:4699)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
>> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompile
>> r.java:5224)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2536)
>> at org.codehaus.janino.UnitCompiler.access$2600(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclara
>> tionStatement(UnitCompiler.java:1459)
>> at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclara
>> tionStatement(UnitCompiler.java:1443)
>> at org.codehaus.janino.Java$LocalVariableDeclarationStatement.
>> accept(Java.java:3348)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1443)
>> at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompi
>> ler.java:1523)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3052)
>> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(Unit
>> Compiler.java:1313)
>> at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(Unit
>> Compiler.java:1286)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:785)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:436)
>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:212)
>> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDe
>> claration(UnitCompiler.java:390)
>> at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDe
>> claration(UnitCompiler.java:385)
>> at org.codehaus.janino.Java$PackageMemberClassDeclaration.accep
>> t(Java.java:1405)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:385)
>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:357)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>> at org.codehaus.janino.SimpleCompiler.compileToClassLoader(Simp
>> leCompiler.java:446)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>> at org.apache.flink.table.codegen.Compiler$class.compile(
>> Compiler.scala:33)
>> ... 13 more
>>
>>
>> *Note:* Due to an issue in Apache Calcite, which prevents the user
>> classloaders from being garbage-collected, we do *not* recommend
>> building a fat-jar that includes the flink-table dependency. Instead, we
>> recommend configuring Flink to include the flink-table dependency in the
>> system classloader. This can be done by copying the flink-table.jar file
>> from the ./opt folder to the ./lib folder. See these instructions
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html>
>> for further details.
>>
>>
>>
>>
>>
>

Mime
View raw message