flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject RE: Collect() freeze on yarn cluster on strange recover/deserialization error
Date Thu, 08 Dec 2016 15:17:50 GMT
Great! :)


On 8 December 2016 at 15:28:05, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> -yjm works, and suits me better than a global fink-conf.yml parameter. I've looked for
 
> a command line parameter like that, but I've missed it in the doc, my mistake.
> Thanks,
> Arnaud
>  
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:uce@apache.org]
> Envoyé : jeudi 8 décembre 2016 14:43
> À : LINZ, Arnaud ; user@flink.apache.org
> Cc : rmetzger@apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error
 
>  
> Good point with the collect() docs. Would you mind opening a JIRA issue for that?
>  
> I'm not sure whether you can specify it via that key for YARN. Can you try to use -yjm
8192  
> when submitting the job?
>  
> Looping in Robert who knows best whether this config key is picked up or not for YARN.
>  
> – Ufuk
>  
> On 8 December 2016 at 14:05:41, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> > Hi Ufuk,
> >
> > Yes, I have a large set of data to collect for a data science job that
> > cannot be distributed easily. Increasing the akka.framesize size do
> > get rid of the collect hang (maybe you should highlight this parameter
> > in the collect() documentation, 10Mb si not that big), thanks.
> >
> > However my job manager now fails with OutOfMemory.
> >
> > Despite the fact that I have setup
> > jobmanager.heap.mb: 8192
> >
> > in my flink-conf.yaml, logs shows that it was created with less memory (1374 Mb)
:
> >
> > 2016-12-08 13:50:13,808 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > -
> > ----------------------------------------------------------------------  
> > ----------
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3,
> > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC)
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Current user: datcrypt
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation -
> > 1.7/24.45-b08
> > 2016-12-08 13:50:13,809 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Maximum heap size: 1374 MiBytes
> > 2016-12-08 13:50:13,810 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JAVA_HOME: /usr/java/default
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - Hadoop version: 2.6.3
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - JVM Options:
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > - -Xmx1434M
> > 2016-12-08 13:50:13,811 INFO
> > org.apache.flink.yarn.YarnApplicationMasterRunner
> > -
> > -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/cont  
> > ainer_e17_1480512120243_3635_01_000001/jobmanager.log
> >
> >
> > Is there a command line option of flink / env variable that overrides
> > it or am I missing something ?
> > -- Arnaud
> >
> > -----Message d'origine-----
> > De : Ufuk Celebi [mailto:uce@apache.org] Envoyé : jeudi 8 décembre
> > 2016 10:49 À : LINZ, Arnaud ; user@flink.apache.org Objet : RE:
> > Collect() freeze on yarn cluster on strange recover/deserialization
> > error
> >
> > I also don't get why the job is recovering, but the oversized message
> > is very likely the cause for the freezing collect, because the data set is gather
via  
> Akka.
> >
> > You can configure the frame size via "akka.framesize", which defaults
> > to 10485760b
> > (10 MB).
> >
> > Is the collected result larger than that? Could you try to increase
> > the frame size and report back?
> >
> > – Ufuk
> >
> > On 7 December 2016 at 17:57:22, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> > > Hi,
> > >
> > > Any news? It's maybe caused by an oversized akka payload (many
> > > akka.remote.OversizedPayloadException: Discarding oversized payload
> > > sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:
 
> > > max allowed size 10485760 bytes, actual size of encoded class
> > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMe
> > > ss
> > > age
> > > was 69074412 bytes in the log)
> > >
> > > How do I set akka's maximum-payload-bytes in my flink cluster?
> > >
> > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about
> > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment().
 
> > >
> > > Do I have to change the way I'm doing things ? How ?
> > >
> > > Thanks,
> > > Arnaud
> > >
> > > -----Message d'origine-----
> > > De : LINZ, Arnaud
> > > Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org
> > > Objet : RE: Collect() freeze on yarn cluster on strange
> > > recover/deserialization error
> > >
> > > Hi,
> > >
> > > Don't think so. I always delete the ZK path before launching the
> > > batch (with /usr/bin/zookeeper-client -server
> > > $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and
> > > the "recovery" log line appears only before
> > the collect() phase, not at the beginning.
> > >
> > > Full log is availlable here :
> > > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r
> > >
> > > Thanks,
> > > Arnaud
> > >
> > >
> > > -----Message d'origine-----
> > > De : Ufuk Celebi [mailto:uce@apache.org] Envoyé : mardi 29 novembre
> > > 2016 18:43 À : LINZ, Arnaud ; user@flink.apache.org Objet : Re:
> > > Collect() freeze on yarn cluster on strange recover/deserialization
> > > error
> > >
> > > Hey Arnaud,
> > >
> > > could this be a left over job that is recovered from ZooKeeper?
> > > Recovery only happens if the configured ZK root contains data.
> > >
> > > A job is removed from ZooKeeper only if it terminates (e.g.
> > > finishes, fails terminally w/o restarting, cancelled). If you just
> > > shut down the cluster this
> > is treated as a failure.
> > >
> > > – Ufuk
> > >
> > > The complete JM logs will be helpful to further check what's happening there.
> > >
> > >
> > > On 29 November 2016 at 18:15:16, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
 
> > > > Hello,
> > > >
> > > > I have a Flink 1.1.3 batch application that makes a simple
> > > > aggregation but freezes when
> > > > collect() is called when the app is deployed on a ha-enabled yarn
> > > > cluster (it works on a local cluster).
> > > > Just before it hangs, I have the following deserialization error in the
logs :
> > > >
> > > > (...)
> > > > 2016-11-29 15:10:10,422 INFO
> > > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> > > > switched from DEPLOYING to RUNNING
> > > > 2016-11-29 15:10:13,175 INFO
> > > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) ->
> > > > Map (Key Remover)
> > > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to
> > > > FINISHED
> > > > 2016-11-29 15:10:17,816 INFO
> > > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) ->
> > > > Map (Key Remover)
> > > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to
> > > > FINISHED
> > > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager
> > > > - Attempting to recover all jobs.
> > > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager
> > > > - Fatal
> > error:
> > > > Failed to recover jobs.
> > > > java.io.StreamCorruptedException: invalid type code: 00 at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> > > > at
> > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java
> > > > :1
> > > > 99
> > > > 0) at
> > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19
> > > > 15
> > > > )
> > > > at
> > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 

> > > > 17
> > > > 98) at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > at
> > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java
> > > > :1
> > > > 99
> > > > 0) at
> > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19
> > > > 15
> > > > )
> > > > at
> > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 

> > > > 17
> > > > 98) at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > at
> > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > > at java.util.HashMap.readObject(HashMap.java:1184)
> > > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc
> > > > ce
> > > > ss
> > > > orImpl.java:43) at
> > > > java.lang.reflect.Method.invoke(Method.java:606)
> > > > at
> > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:
> > > > 10
> > > > 17
> > > > ) at
> > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:18
> > > > 93
> > > > )
> > > > at
> > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 

> > > > 17
> > > > 98) at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > at
> > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java
> > > > :1
> > > > 99
> > > > 0) at
> > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19
> > > > 15
> > > > )
> > > > at
> > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 

> > > > 17
> > > > 98) at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > at
> > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java
> > > > :1
> > > > 99
> > > > 0) at
> > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19
> > > > 15
> > > > )
> > > > at
> > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: 

> > > > 17
> > > > 98) at
> > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > > > at
> > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > > > at
> > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
 
> > > > getState(FileSerializableStateHandle.java:58)
> > > > at
> > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
 
> > > > getState(FileSerializableStateHandle.java:35)
> > > > at
> > > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. 

> > > > re
> > > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> > > > at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobM
> > > > an
> > > > ag
> > > > er.scala:530) at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.
> > > > sc
> > > > al
> > > > a:526) at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.
> > > > sc
> > > > al
> > > > a:526) at
> > > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > > > at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
> > > > at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > > > at
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess
> > > > ag
> > > > e$
> > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > > > at
> > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1
> > > > $1
> > > > (F
> > > > uture.scala:24) at
> > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.
> > > > sc
> > > > al
> > > > a:24) at
> > > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > > > at
> > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(A
> > > > bs
> > > > tr
> > > > actDispatcher.scala:401) at
> > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:26
> > > > 0)
> > > > at
> > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
 
> > > > java:1339) at
> > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java
> > > > :1
> > > > 97
> > > > 9) at
> > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerT
> > > > hr
> > > > ea
> > > > d.java:107)
> > > >
> > > >
> > > > Do you have an idea of what can be wrong? I have no problems with
> > > > other batch applications, just with this one. Why is it trying to
> > > > recover the jobs In
> > > the first place ?
> > > > Thanks,
> > > > Arnaud
> > > >
> > > > ________________________________
> > > >
> > > > L'intégrité de ce message n'étant pas assurée sur internet, la
> > > > société expéditrice ne peut être tenue responsable de son contenu
> > > > ni de ses pièces jointes. Toute utilisation ou diffusion non
> > > > autorisée est interdite. Si vous n'êtes pas destinataire de ce
> > > > message, merci de le détruire et d'avertir
> > > l'expéditeur.
> > > >
> > > > The integrity of this message cannot be guaranteed on the Internet.
> > > > The company that sent this message cannot therefore be held liable
> > > > for its content nor attachments. Any unauthorized use or
> > > > dissemination is prohibited. If you are not the intended recipient
> > > > of this message, then please delete
> > > it and notify the sender.
> > > >
> > >
> > >
> >
> >
>  
>  


Mime
View raw message