flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Subject RE: Collect() freeze on yarn cluster on strange recover/deserialization error
Date Wed, 07 Dec 2016 16:56:59 GMT
Hi,

Any news? It's maybe caused by an oversized akka payload 
(many akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]:
max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
was 69074412 bytes in the log)

How do I set akka's maximum-payload-bytes in my flink cluster? 

https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I do not use ExecutionEnvironment.createRemoteEnvironment()
but ExecutionEnvironment.getExecutionEnvironment().

Do I have to change the way I'm doing things ? How ?

Thanks,
Arnaud

-----Message d'origine-----
De : LINZ, Arnaud 
Envoyé : mercredi 30 novembre 2016 08:59
À : user@flink.apache.org
Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error

Hi, 

Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client
-server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery"
log line appears only before the collect() phase, not at the beginning.

Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r


Thanks,
Arnaud


-----Message d'origine-----
De : Ufuk Celebi [mailto:uce@apache.org] Envoyé : mardi 29 novembre 2016 18:43 À : LINZ,
Arnaud <ALINZ@bouyguestelecom.fr>; user@flink.apache.org Objet : Re: Collect() freeze
on yarn cluster on strange recover/deserialization error

Hey Arnaud,

could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the
configured ZK root contains data.

A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o
restarting, cancelled). If you just shut down the cluster this is treated as a failure.

– Ufuk

The complete JM logs will be helpful to further check what's happening there. 


On 29 November 2016 at 18:15:16, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> Hello,
>  
> I have a Flink 1.1.3 batch application that makes a simple aggregation 
> but freezes when
> collect() is called when the app is deployed on a ha-enabled yarn 
> cluster (it works on a local cluster).
> Just before it hangs, I have the following deserialization error in the logs :
>  
> (...)
> 2016-11-29 15:10:10,422 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> switched from DEPLOYING to RUNNING
> 2016-11-29 15:10:13,175 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map 
> (Key Remover)
> (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to 
> FINISHED
> 2016-11-29 15:10:17,816 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map 
> (Key Remover)
> (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to 
> FINISHED
> 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - 
> Attempting to recover all jobs.
> 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error:  
> Failed to recover jobs.
> java.io.StreamCorruptedException: invalid type code: 00 at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at java.util.HashMap.readObject(HashMap.java:1184)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017
> ) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
> getState(FileSerializableStateHandle.java:58)
> at
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
> getState(FileSerializableStateHandle.java:35)
> at
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re
> coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag
> er.scala:530) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal
> a:526) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal
> a:526) at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F
> uture.scala:24) at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal
> a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> actDispatcher.scala:401) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> java:1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> 9) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> d.java:107)
>  
>  
> Do you have an idea of what can be wrong? I have no problems with 
> other batch applications, just with this one. Why is it trying to recover the jobs In
the first place ?
> Thanks,
> Arnaud
>  
> ________________________________
>  
> L'intégrité de ce message n'étant pas assurée sur internet, la société 
> expéditrice ne peut être tenue responsable de son contenu ni de ses 
> pièces jointes. Toute utilisation ou diffusion non autorisée est 
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
l'expéditeur.
>  
> The integrity of this message cannot be guaranteed on the Internet. 
> The company that sent this message cannot therefore be held liable for 
> its content nor attachments. Any unauthorized use or dissemination is 
> prohibited. If you are not the intended recipient of this message, then please delete
it and notify the sender.
>  

Mime
View raw message