flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Collect() freeze on yarn cluster on strange recover/deserialization error
Date Tue, 29 Nov 2016 17:42:37 GMT
Hey Arnaud,

could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the
configured ZK root contains data.

A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o
restarting, cancelled). If you just shut down the cluster this is treated as a failure.

– Ufuk

The complete JM logs will be helpful to further check what's happening there. 


On 29 November 2016 at 18:15:16, LINZ, Arnaud (alinz@bouyguestelecom.fr) wrote:
> Hello,
>  
> I have a Flink 1.1.3 batch application that makes a simple aggregation but freezes when
 
> collect() is called when the app is deployed on a ha-enabled yarn cluster (it works on
 
> a local cluster).
> Just before it hangs, I have the following deserialization error in the logs :
>  
> (...)
> 2016-11-29 15:10:10,422 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph 

> - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) switched from  
> DEPLOYING to RUNNING
> 2016-11-29 15:10:13,175 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph 

> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover)
 
> (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to FINISHED
> 2016-11-29 15:10:17,816 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph 

> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover)
 
> (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to FINISHED
> 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - Attempting  
> to recover all jobs.
> 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error:  
> Failed to recover jobs.
> java.io.StreamCorruptedException: invalid type code: 00
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at java.util.HashMap.readObject(HashMap.java:1184)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
> at java.lang.reflect.Method.invoke(Method.java:606)
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)  
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)  
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)  
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)  
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)  
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58)
 
> at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
 
> at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:530)
 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526)
 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526)
 
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
 
> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
 
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)  
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
 
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 

> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)  
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
>  
>  
> Do you have an idea of what can be wrong? I have no problems with other batch applications,
 
> just with this one. Why is it trying to recover the jobs In the first place ?
> Thanks,
> Arnaud
>  
> ________________________________
>  
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice
 
> ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation
 
> ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message,
 
> merci de le détruire et d'avertir l'expéditeur.
>  
> The integrity of this message cannot be guaranteed on the Internet. The company that
 
> sent this message cannot therefore be held liable for its content nor attachments. Any
 
> unauthorized use or dissemination is prohibited. If you are not the intended recipient
 
> of this message, then please delete it and notify the sender.
>  


Mime
View raw message