flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject RE: Collect() freeze on yarn cluster on strange recover/deserialization error
Date Thu, 08 Dec 2016 09:49:07 GMT
I also don't get why the job is recovering, but the oversized message is very likely the cause
for the freezing collect, because the data set is gather via Akka.

You can configure the frame size via "akka.framesize", which defaults to 10485760b (10 MB).

Is the collected result larger than that? Could you try to increase the frame size and report
back?

– Ufuk

On 7 December 2016 at 17:57:22, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> Hi,
>  
> Any news? It's maybe caused by an oversized akka payload
> (many akka.remote.OversizedPayloadException: Discarding oversized payload sent  
> to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:  
> max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
 
> was 69074412 bytes in the log)
>  
> How do I set akka's maximum-payload-bytes in my flink cluster?
>  
> https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I do  
> not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment().
 
>  
> Do I have to change the way I'm doing things ? How ?
>  
> Thanks,
> Arnaud
>  
> -----Message d'origine-----
> De : LINZ, Arnaud
> Envoyé : mercredi 30 novembre 2016 08:59
> À : user@flink.apache.org
> Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error
 
>  
> Hi,
>  
> Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client
 
> -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and  
> the "recovery" log line appears only before the collect() phase, not at the beginning.
 
>  
> Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r
 
>  
> Thanks,
> Arnaud
>  
>  
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:uce@apache.org] Envoyé : mardi 29 novembre 2016 18:43 À :
LINZ,  
> Arnaud ; user@flink.apache.org Objet : Re: Collect()  
> freeze on yarn cluster on strange recover/deserialization error
>  
> Hey Arnaud,
>  
> could this be a left over job that is recovered from ZooKeeper? Recovery only happens
 
> if the configured ZK root contains data.
>  
> A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally
 
> w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure.
 
>  
> – Ufuk
>  
> The complete JM logs will be helpful to further check what's happening there.
>  
>  
> On 29 November 2016 at 18:15:16, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> > Hello,
> >
> > I have a Flink 1.1.3 batch application that makes a simple aggregation
> > but freezes when
> > collect() is called when the app is deployed on a ha-enabled yarn
> > cluster (it works on a local cluster).
> > Just before it hangs, I have the following deserialization error in the logs :
> >
> > (...)
> > 2016-11-29 15:10:10,422 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> > switched from DEPLOYING to RUNNING
> > 2016-11-29 15:10:13,175 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > (Key Remover)
> > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to
> > FINISHED
> > 2016-11-29 15:10:17,816 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> > (Key Remover)
> > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to
> > FINISHED
> > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager -
> > Attempting to recover all jobs.
> > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error:
 
> > Failed to recover jobs.
> > java.io.StreamCorruptedException: invalid type code: 00 at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > at java.util.HashMap.readObject(HashMap.java:1184)
> > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess  
> > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
> > at
> > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017  
> > ) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199  
> > 0) at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> > at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17  
> > 98) at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> > at
> > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.  
> > getState(FileSerializableStateHandle.java:58)
> > at
> > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.  
> > getState(FileSerializableStateHandle.java:35)
> > at
> > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re  
> > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> > at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag  
> > er.scala:530) at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal  
> > a:526) at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal  
> > a:526) at
> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
> > at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > at
> > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$  
> > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> > at
> > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F  
> > uture.scala:24) at
> > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal  
> > a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr  
> > actDispatcher.scala:401) at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.  
> > java:1339) at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197  
> > 9) at
> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea  
> > d.java:107)
> >
> >
> > Do you have an idea of what can be wrong? I have no problems with
> > other batch applications, just with this one. Why is it trying to recover the jobs
In  
> the first place ?
> > Thanks,
> > Arnaud
> >
> > ________________________________
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> > expéditrice ne peut être tenue responsable de son contenu ni de ses
> > pièces jointes. Toute utilisation ou diffusion non autorisée est
> > interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire
et d'avertir  
> l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet.
> > The company that sent this message cannot therefore be held liable for
> > its content nor attachments. Any unauthorized use or dissemination is
> > prohibited. If you are not the intended recipient of this message, then please delete
 
> it and notify the sender.
> >
>  
>  


Mime
View raw message