flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: problem on yarn cluster
Date Mon, 18 May 2015 15:58:54 GMT
Hi Michele!

It looks like there are quite a few things going wrong in your case. Let me
see what I can deduce from the output you are showing me:


1) You seem to run into a bug that exists in the 0.9-milestone-1 and has
been fixed in the master:

As far as I can tell, you call "collect()" on a data set, and the type that
you collect is a custom type.
The milestone has a bug there that it uses the wrong classloader, so
"collect()" is unfortunately not supported there on custom types. The
latest SNAPSHOT version should have this fixed.
We are pushing to finalize the code for the 0.9 release, so hopefully we
have an official release with that fix in a few weeks.


2) The results that you collect seem to be very large, so the JobManager
actually runs out of memory while collecting them. This situation should be
improved in the current master as well,
even though it is still possible to break the master's heap with the
collect() call (we plan to fix that soon as well).

Can you try and see if the latest SNAPSHOT version solves your issue?




BTW: It looks like you are starting two YARN sessions actually (Robert can
probably comment on this)

./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096

./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar


The first line starts a YARN session against which you can run multiple
jobs. The second line actually starts another dedicated YARN session for
that job.



Greetings,
Stephan


On Mon, May 18, 2015 at 5:30 PM, Michele Bertoni <
michele1.bertoni@mail.polimi.it> wrote:

>  Hi,
>
> I have a problem running my app on a Yarn cluster
>
>
> I developed it in my computer and everything is working fine
>
> then we setup the environment on Amazon EMR reading data from HDFS not S3
>
>
> we run it with these command
>
>
> ./yarn-session.sh -n 3 -s 1 -jm 1024 -tm 4096
>
> ./flink run -m yarn-cluster -yn 3 -yjm 1024 -ytm 4096 ../path/to/jar
>
>
> we are using flink 0.9.0-milestone-1
>
>
> after running it, the terminal windows where we launch it totally crash, the last messages
are
>
>
> 05/18/2015 15:19:56	Job execution switched to status FINISHED.
> 05/18/2015 15:19:56	Job execution switched to status FAILING.
>
>
>
>
> this is the error from the yarn log
>
>
> 2015-05-18 15:19:53 ERROR ApplicationMaster$$anonfun$2$$anon$1:66 - Could not process
accumulator event of job 5429fe215a3953101cd575cd82b596c8 received from akka://flink/deadLetters.
> java.lang.OutOfMemoryError: Java heap space
> 	at org.apache.flink.api.common.accumulators.ListAccumulator.read(ListAccumulator.java:91)
> 	at org.apache.flink.runtime.accumulators.AccumulatorEvent.getAccumulators(AccumulatorEvent.java:124)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:350)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
> 	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2015-05-18 15:19:56 ERROR OneForOneStrategy:66 - java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
> org.apache.commons.lang3.SerializationException: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
> 	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:230)
> 	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:268)
> 	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:51)
> 	at org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListAccumulator.java:35)
> 	at org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAccumulatorResults(AccumulatorManager.java:77)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:300)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99)
> 	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:91)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.ClassNotFoundException: LowLevel.FlinkImplementation.FlinkDataTypes$GValue
> 	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> 	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:274)
> 	at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> 	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> 	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> 	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 	at org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.java:224)
> 	... 25 more
>
>
>
>
>
>
>
>  Can you help me understanding something?
>
>  thanks
>
>

Mime
View raw message