flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Taskmanagers are quarantined
Date Wed, 29 Nov 2017 13:34:05 GMT
Hi,

you could also try increasing the heartbeat timeout via
`akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.

Cheers,
Till

On Wed, Nov 29, 2017 at 12:41 PM, T Obi <t.obi@geniee.co.jp> wrote:

> Warnings of Datanode appeared not in all cases of timeout. They seem
> to be raised just by timeout while snapshotting.
>
> We output GC logs on taskmanagers and found that someone kicks
> System.gc() every an hour.
> So a full GC runs every an hour, and it takes about a minute or more
> in our cases...
> When a taskmanager is timed out, the full GC seems to be always
> running on it. The full GC is not only by System.gc() but also "Full
> GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.
>
> Some of our jobs have a large state. I think because of this the full
> GC takes long time.
> I try to make a few taskmanagers run with divided memory size on each
> machine.
> Also I will tune JVM memory parameters to reduce the frequency of
> "Full GC (Metadata GC Threshold)".
>
> Best,
> Tetsuya
>
>
> 2017-11-28 16:30 GMT+09:00 T Obi <t.obi@geniee.co.jp>:
> > Hello Chesnay,
> >
> > Thank you for answer to my rough question.
> >
> > Not all of taskmanagers are quarantined at a time, but each
> > taskmanager has been quarantined at least once.
> >
> > We are using CDH 5.8 based on hadoop 2.6.
> > We didn't give attention about datanodes. We will check it.
> > However, we are also using the HDFS for MapReduce and it seems to work
> fine.
> >
> > I searched archives of this mailing list with keyword "Detected
> > unreachable" and found out mails about trouble on GC.
> > Though we are using G1GC, we try to output GC log.
> >
> >
> > Best,
> > Tetsuya
> >
> > 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <chesnay@apache.org>:
> >> Are only some taskmanagers quarantined, or all of them?
> >>
> >> Do the quarantined taskmanagers have anything in common?
> >> (are the failing ones always on certain machines; do the stacktraces
> >> reference the same hdfs datanodes)
> >>
> >> Which hadoop version are you using?
> >>
> >> From the stack-trace it appears that multiple hdfs nodes are being
> >> corrupted.
> >> The taskmanagers timeout since the connection to zookeeper breaks down,
> >> at which point it no longer knows who the leading jobmanager knows and
> >> subsequently shuts down.
> >>
> >>
> >> On 27.11.2017 08:02, T Obi wrote:
> >>>
> >>> Hello all,
> >>>
> >>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
> >>> a problem. Suddenly a connection between a taskmanager and the
> >>> jobmanager is timed out and the taskmanager is "quarantined" by
> >>> jobmanager.
> >>> Once a taskmanager is quarantined, of course jobs are restarted, but
> >>> the timeout and quarantine happens to some taskmanager successively.
> >>>
> >>> When a taskmanager's connection to jobmanager was timed out, its
> >>> connections to zookeeper and snapshot HDFS were also timed out. So the
> >>> problem doesn't seem to be one of Flink itself.
> >>> But though a taskmanager which runs on the same machine as jobmanager
> >>> is timed out, jobmanager is alright at the time. So I think it is not
> >>> OS problem too.
> >>>
> >>> Could you give us some advice on how to investigate? Thank you.
> >>>
> >>>
> >>>
> >>> Taskmanager command line:
> >>>
> >>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
> >>> -XX:MaxDirectMemorySize=8388607T
> >>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-
> 0-flink-jp-2.log
> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/
> log4j.properties
> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/
> conf/logback.xml
> >>> -classpath
> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/
> opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.
> 2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/
> flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/
> flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
> >>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> >>> /opt/flink/flink-1.3.2/conf
> >>>
> >>>
> >>> Taskmanager (on flink-jp-2) log:
> >>>
> >>> 2017-11-22 14:09:31,595 INFO
> >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
> >>> backend snapshot (File Stream Factory @
> >>>
> >>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/
> 9469db324b834e9dcf5b46428b3ae011,
> >>> synchronous part) in thread
> >>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
> >>>
> >>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.
> BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
> >>>
> >>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.
> ScalaReduceFunction@72bca894},
> >>> ProcessingTimeTrigger(),
> >>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
> >>> (9/30),5,Flink Task Threads] took 142 ms.
> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - DFSOutputStream ResponseProcessor exception
> >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_
> 620518999
> >>> java.io.EOFException: Premature EOF: no length prefix available
> >>>          at
> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:
> 2207)
> >>>          at
> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(
> PipelineAck.java:176)
> >>>          at
> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$
> ResponseProcessor.run(DFSOutputStream.java:867)
> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - DFSOutputStream ResponseProcessor exception
> >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_
> 621053744
> >>> java.io.EOFException: Premature EOF: no length prefix available
> >>>          at
> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:
> 2207)
> >>>          at
> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(
> PipelineAck.java:176)
> >>>          at
> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$
> ResponseProcessor.run(DFSOutputStream.java:867)
> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - DFSOutputStream ResponseProcessor exception
> >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_
> 620520092
> >>> java.io.EOFException: Premature EOF: no length prefix available
> >>>          at
> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:
> 2207)
> >>>          at
> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(
> PipelineAck.java:176)
> >>>          at
> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$
> ResponseProcessor.run(DFSOutputStream.java:867)
> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - DFSOutputStream ResponseProcessor exception
> >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_
> 620517393
> >>> java.io.EOFException: Premature EOF: no length prefix available
> >>>          at
> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:
> 2207)
> >>>          at
> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(
> PipelineAck.java:176)
> >>>          at
> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$
> ResponseProcessor.run(DFSOutputStream.java:867)
> >>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - Error Recovery for block
> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
> >>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
> >>> datanode 10.5.0.61:50010
> >>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - Error Recovery for block
> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
> >>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
> >>> datanode 10.5.0.59:50010
> >>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - Error Recovery for block
> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
> >>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
> >>> 10.5.0.52:50010
> >>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
> >>>                       - Client session timed out, have not heard from
> >>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
> >>> connection and attempting reconnect
> >>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
> >>>                       - Error Recovery for block
> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
> >>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
> >>> datanode 10.5.0.69:50010
> >>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
> >>>                       - Detected unreachable:
> >>> [akka.tcp://flink@flink-jp-2:43139]
> >>> 2017-11-22 14:12:10,142 INFO
> >>>
> >>> org.apache.flink.shaded.org.apache.curator.framework.
> state.ConnectionStateManager
> >>>   - State change: SUSPENDED
> >>> 2017-11-22 14:12:10,142 WARN
> >>> org.apache.flink.runtime.leaderretrieval.
> ZooKeeperLeaderRetrievalService
> >>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
> >>> leader from ZooKeeper.
> >>> 2017-11-22 14:12:10,157 INFO
> >>> org.apache.flink.runtime.taskmanager.TaskManager              -
> >>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
> >>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
> >>> longer reachable
> >>> 2017-11-22 14:12:10,158 INFO
> >>> org.apache.flink.runtime.taskmanager.TaskManager              -
> >>> Cancelling all computations and discarding all cached data.
> >>>
> >>>
> >>>
> >>> Jobmanager command line:
> >>>
> >>> java -Xms8192m -Xmx8192m
> >>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-
> 0-flink-jp-2.log
> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/
> log4j.properties
> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/
> conf/logback.xml
> >>> -classpath
> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/
> opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.
> 2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/
> flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/
> flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
> >>> org.apache.flink.runtime.jobmanager.JobManager --configDir
> >>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
> >>> --webui-port 8081
> >>>
> >>>
> >>> Jobmanager (on flink-jp-2) log:
> >>>
> >>> 2017-11-22 14:09:32,252 INFO
> >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> >>> Completed checkpoint 293 (125180549 bytes in 889
> >>>   ms).
> >>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
> >>>                       - Detected unreachable:
> >>> [akka.tcp://flink@flink-jp-2:42609]
> >>> 2017-11-22 14:12:02,705 INFO
> >>> org.apache.flink.runtime.jobmanager.JobManager                - Task
> >>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
> >>> 2017-11-22 14:12:02,705 INFO
> >>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> >>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
> >>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
> >>> from RUNNING to FAILED.
> >>> java.lang.Exception: TaskManager was lost/killed:
> >>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
> >>>          at
> >>> org.apache.flink.runtime.instance.SimpleSlot.
> releaseSlot(SimpleSlot.java:217)
> >>>          at
> >>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.
> releaseSharedSlot(SlotSharingGroupAssignment.java:533)
> >>>          at
> >>> org.apache.flink.runtime.instance.SharedSlot.
> releaseSlot(SharedSlot.java:192)
> >>>          at
> >>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
> >>>          at
> >>> org.apache.flink.runtime.instance.InstanceManager.
> unregisterTaskManager(InstanceManager.java:212)
> >>>          at
> >>> org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(
> JobManager.scala:1228)
> >>>          at
> >>> org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
> >>>          at
> >>> scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
> >>>          at
> >>> org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
> >>>          at
> >>> scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
> >>>          at
> >>> org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
> >>>          at
> >>> org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
> >>>          at
> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >>>          at
> >>> org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
> >>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> >>>          at
> >>> org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:125)
> >>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >>>          at
> >>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(
> DeathWatch.scala:44)
> >>>          at akka.actor.ActorCell.receivedTerminated(ActorCell.
> scala:369)
> >>>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.
> scala:501)
> >>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> >>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> >>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> >>>          at
> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> >>>          at
> >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>          at
> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> >>>          at
> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >>>          at
> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >>>
> >>>
> >>>
> >>> Best,
> >>> Tetsuya
> >>>
> >>
>

Mime
View raw message