flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Class loading issues when using Remote Execution Environment
Date Fri, 27 Apr 2018 06:52:14 GMT
First, a small correction for my previous mail:

I could reproduce your problems locally when submitting the fat-jar.
Turns out i never submitted the far-jar, as i didn't pass the jar file 
argument to RemoteEnvironment.

Now on to your questions:

/What version of Flink are you trying with?//
/I got it working /once /with 1.6-SNAPSHOT, but i would recommend 
sticking with 1.3.1 since that is the version gradoop depends on. (i 
haven't tried it with this version yet, but that's the next thing on my 
list)

/Are there other config changes (flink-conf.yaml) that you made in your 
cluster?
/It was the standard config.

/Is org.apache.flink.api.common.io.FileOutputFormat a good alternative 
to LocalCollectionOutputFormat?
/It can be used, but if the result is small you could also use accumulators.
/
//Do you think it is better to use jarFiles argument on 
createRemoteEnvironment?
/Yes, once we get it working this is the way to go.

On 26.04.2018 18:42, kedar mhaswade wrote:
> Thanks Chesnay for your incredible help!
>
> I will try out the suggestions again. A few questions:
> - What version of Flink are you trying with? I have had issues when I 
> placed the gradoop-demo-shaded.jarin the lib folder on Flink 
> installation (1.4 even refused to start!).
> - Are there other config changes (flink-conf.yaml) that you made in 
> your cluster?
> - Is org.apache.flink.api.common.io.FileOutputFormat a good 
> alternative to LocalCollectionOutputFormat, or should I use 
> HadoopOutputFormatCommonBase (I do want to run the cluster on YARN 
> later; at the moment I am trying on a standalone cluster).
> - Do you think it is better to use jarFiles argument on 
> createRemoteEnvironment (which deploys the JAR only for this job and 
> not mess with the entire Flink cluster) a better option than placing 
> the JAR(s) in the lib folder?
>
> Thanks again,
> Regards,
> Kedar
>
>
> On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <chesnay@apache.org 
> <mailto:chesnay@apache.org>> wrote:
>
>     Small update:
>
>     I could reproduce your problems locally when submitting the fat-jar.
>     I could get the job to run after placing the
>     gradoop-demo-shaded.jar into the lib folder.
>     I have not tried yet placing only the gradoop jars into lib (but
>     my guess is you missed a gradoop jar)
>
>     Note that the job fails to run since you use
>     "LocalCollectionOutputFormat" which can only be used for local
>     execution, i.e. when the job submission and execution happen in
>     the same JVM.
>
>
>     On 25.04.2018 14:23, kedar mhaswade wrote:
>>     Thank you for your response!
>>
>>     I have not tried the flink run app.jar route because the way the
>>     app is set up does not allow me to do it. Basically, the app is a
>>     web application which serves the UI and also submits a Flink job
>>     for running Cypher queries. It is a proof-of-concept app, but
>>     IMO, a very useful one.
>>
>>     Here's how you can reproduce:
>>     1) git clone git@github.com:kedarmhaswade/gradoop_demo.git
>>     <mailto:git@github.com:kedarmhaswade/gradoop_demo.git> (this is
>>     my fork of gradoop_demo)
>>     2) cd gradoop_demo
>>     3) git checkout dev => dev is the branch where my changes to make
>>     gradoop work with remote environment go.
>>     4) mvn clean package => should bring the gradoop JARs that this
>>     app needs; these JARs should then be placed in <flink-install>/lib.
>>     5) cp
>>     ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar
>>     <flink-install>/lib, cp
>>     ~/.m2/repository/org/gradoop/gradoop-flink/0.3.2/gradoop-flink-0.3.2.jar
>>     <flink-install>/lib, cp target/gradoop-demo-0.2.0.jar
>>     <flink-install>/lib.
>>     6) start the local flink cluster (I have tried with latest
>>     (built-from-source) 1.6-SNAPSHOT, or 1.4)
>>     <flink-install>/bin/start-cluster.sh -- note the JM host and port
>>     7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust
>>     host and port per your cluster) => this is now configured to talk
>>     to the RemoteEnvironment at given JM host and port.
>>     8) open a browser at:
>>     http://localhost:2342/gradoop/html/cypher.html
>>     <http://localhost:2342/gradoop/html/cypher.html>
>>     9) hit the query button => this would throw the exception
>>     10) Ctrl C the process in 7 and just restart it as java -cp
>>     target/classes:target/gradoop-demo-shaded.jar
>>     org.gradoop.demo.server.Server => starts LocalEnvironment
>>     11) do 9 again and see the results shown nicely in the browser.
>>
>>     Here is the relevant code:
>>     1) Choosing between
>>     <https://github.com/kedarmhaswade/gradoop_demo/blob/dev/src/main/java/org/gradoop/demo/server/Server.java#L107>
>>     a Remote or a Local Environment.
>>
>>     The instructions are correct to my knowledge. Thanks for your
>>     willingness to try. I have tried everything I can. With different
>>     Flink versions, I get different results (I have also tried on
>>     1.6-SNAPSHOT with class loading config being parent-first, or
>>     child-first).
>>
>>     Regards,
>>     Kedar
>>
>>
>>     On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler
>>     <chesnay@apache.org <mailto:chesnay@apache.org>> wrote:
>>
>>         I couldn't spot any error in what you tried to do. Does the
>>         job-submission succeed if you submit the jar through the
>>         command-line client?
>>
>>         Can you share the project, or a minimal reproducing version?
>>
>>
>>         On 25.04.2018 00:41, kedar mhaswade wrote:
>>>         I am trying to get gradoop_demo
>>>         <https://github.com/dbs-leipzig/gradoop_demo> (a gradoop
>>>         based graph visualization app) working on Flink with
>>>         *Remote* Execution Environment.
>>>
>>>         This app, which is based on Gradoop, submits a job to the
>>>         /preconfigured/ execution environment, collects the results
>>>         and sends it to the UI for rendering.
>>>
>>>         When the execution environment is configured to be a
>>>         LocalEnvironment
>>>         <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/LocalEnvironment.html>,
>>>         everything works fine. But when I start a cluster (using
>>>         <flink-install-path>/bin/start-cluster.sh), get the Job
>>>         Manager endpoint (e.g. localhost:6123) and configure a
>>>         RemoteEnvironment
>>>         <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/ExecutionEnvironment.html#createRemoteEnvironment-java.lang.String-int-org.apache.flink.configuration.Configuration-java.lang.String...->
and
>>>         use that environment to run the job, I get exceptions [1].
>>>
>>>         Based on the class loading doc
>>>         <https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_classloading.html>,
>>>         I copied the gradoop classes
>>>         (gradoop-flink-0.3.3-SNAPSHOT.jar, gradoop-common-0.3.3-SNAPSHOT.jar)
>>>         to the <flink-install-path>/lib folder (hoping that that way
>>>         those classes will be available to all the executors in the
>>>         cluster). I have ensured that the class that Flink fails to
>>>         load is in fact available in the Gradoop jars that I copied
>>>         to the /lib folder.
>>>
>>>         I have tried using the RemoteEnvironment method with
>>>         jarFiles argument where the passed JAR file is a fat jar
>>>         containing everything (in which case there is no Gradoop JAR
>>>         file in /lib folder).
>>>
>>>         So, my questions are:
>>>         1) How can I use RemoteEnvironment?
>>>         2) Is there any other way of doing this /programmatically?
>>>         /(That means I can't do flink run since I am interested in
>>>         the job execution result as a blocking call -- which means
>>>         ideally I don't want to use the submit RESTful API as well).
>>>         I just want RemoteEnvironment to work as well as
>>>         LocalEnvironment.
>>>
>>>         Regards,
>>>         Kedar
>>>
>>>
>>>         [1]
>>>         2018-04-24 15:16:02,823 ERROR
>>>         org.apache.flink.runtime.jobmanager.JobManager              
>>>         - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d
>>>         (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018)
>>>         java.lang.NoClassDefFoundError: Could not initialize class
>>>         *org.gradoop.common.model.impl.id.GradoopId*
>>>           at java.io.ObjectStreamClass.hasStaticInitializer(Native
>>>         Method)
>>>           at
>>>         java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamClass.java:1887)
>>>           at
>>>         java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79)
>>>           at java.io.ObjectStreamClass$1.ru
>>>         <http://1.ru>n(ObjectStreamClass.java:263)
>>>           at java.io.ObjectStreamClass$1.ru
>>>         <http://1.ru>n(ObjectStreamClass.java:261)
>>>           at java.security.AccessController.doPrivileged(Native Method)
>>>           at
>>>         java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamClass.java:260)
>>>           at
>>>         java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682)
>>>           at
>>>         java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1876)
>>>           at
>>>         java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1745)
>>>           at
>>>         java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710)
>>>           at
>>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550)
>>>           at
>>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>>>           at java.util.HashSet.readObject(HashSet.java:341)
>>>           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>           at
>>>         sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>           at
>>>         sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>           at java.lang.reflect.Method.invoke(Method.java:498)
>>>           at
>>>         java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>>>           at
>>>         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
>>>           at
>>>         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>>           at
>>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>>>           at
>>>         java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)
>>>           at
>>>         java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)
>>>           at
>>>         java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>>           at
>>>         java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
>>>           at
>>>         java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)
>>>           at
>>>         org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)....
>>>
>>
>>
>
>


Mime
View raw message