flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shahar Cizer Kobrinsky <shahar.kobrin...@gmail.com>
Subject Re: Dynamically Generated Classes - Cannot load user class
Date Tue, 23 Oct 2018 01:42:32 GMT
I have with no luck. I wonder though - do I need to load it only in the map function? I tried
to add it in the open method of the sink function and the process function I have there too
cause they still are using the type.. still no good. is there any way. is there a way of knowing
which operator fails?
________________________________
From: Hequn Cheng <chenghequn@gmail.com>
Sent: Monday, October 22, 2018 6:33:52 PM
To: Shahar Cizer Kobrinsky
Cc: user
Subject: Re: Dynamically Generated Classes - Cannot load user class

Hi shkob

> i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for
Byte Buddy - but doesnt seem to be enough.
>From the log, it seems that the user class can not be found in the classloader.
Cannot load user class: com....model.MyGeneratedClass
Have you ever tried Thread.currentThread().getContextClassLoader(), which should have the
user-code ClassLoader.

Best, Hequn

On Tue, Oct 23, 2018 at 5:47 AM shkob1 <shahar.kobrinsky@gmail.com<mailto:shahar.kobrinsky@gmail.com>>
wrote:
Hey,

I'm trying to run a job which uses a dynamically generated class (through
Byte Buddy).
think of me having a complex schema as yaml text and generating a class from
it. Throughout the job i am using an artificial super class (MySuperClass)
of the generated class (as for example i need to specify the generic class
to extend RichMapFunction).



MyRichMapFunction<Y extends MySuperClass> extends RichMapFunction<Row, Y> is
introducing the dynamic class. It will take the yaml in the CTOR and:
1. open - takes the schema and converts it into a Pojo class which extends
MySuperClass
2. getProducedType - does the same thing in order to correctly send the Pojo
with all the right fields

So basically my job is something like

env.addSource([stream of pojos])
.filter(...)
... (register table, running a query which generates Rows)
.map(myRichMapFunction)
.returns(myRichMapFunction.getProducedType)
.addSink(...)

My trouble now is that, when running on a cluster the classloader fails to
load my generated class.
i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to
use for Byte Buddy - but doesnt seem to be enough.

Was reading about it here:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html
Is there a hook maybe to get called when a job is loaded so i can load the
class?


Stacktrace:

org.apache.flink.client.program.ProgramInvocationException:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: com....model.MyGeneratedClass
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
        at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
        at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at com.....MainClass.main(MainClass.java:46)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
        at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Cannot load user class: com.....model.DynamicSchema
ClassLoader info: URL ClassLoader:
    file:
'/var/folders/f7/c4pvjrf902b6c73_tbzkxnjw0000gn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0'
(valid JAR)
Class not resolvable through given classloader.
        at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:99)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:273)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:748)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message