flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Subject Collect() freeze on yarn cluster on strange recover/deserialization error
Date Tue, 29 Nov 2016 17:14:57 GMT
Hello,

I have a Flink 1.1.3 batch application that makes a simple aggregation but freezes when collect()
is called when the app is deployed on a ha-enabled yarn cluster (it works on a local cluster).
Just before it hangs, I have the following deserialization error in the logs :

(...)
2016-11-29 15:10:10,422 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) switched from DEPLOYING
to RUNNING
2016-11-29 15:10:13,175 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) (2/4)
(c098cf691c28364ca47d322c7a76259a) switched from RUNNING to FINISHED
2016-11-29 15:10:17,816 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
  - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) (1/4)
(aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to FINISHED
2016-11-29 15:10:38,060 INFO  org.apache.flink.yarn.YarnJobManager                       
  - Attempting to recover all jobs.
2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager                       
  - Fatal error: Failed to recover jobs.
java.io.StreamCorruptedException: invalid type code: 00
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at java.util.HashMap.readObject(HashMap.java:1184)
        at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58)
        at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
        at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:530)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Do you have an idea of what can be wrong? I have no problems with other batch applications,
just with this one. Why is it trying to recover the jobs In the first place ?
Thanks,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice
ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation
ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message,
merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent
this message cannot therefore be held liable for its content nor attachments. Any unauthorized
use or dissemination is prohibited. If you are not the intended recipient of this message,
then please delete it and notify the sender.
Mime
View raw message