flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: Flink job hangs using rocksDb as backend
Date Thu, 12 Jul 2018 14:10:57 GMT
Hi,

Did you check the metrics for the garbage collector? Stuck with high CPU consumption and lots
of timers sound like there could be a possible problem, because timer are currently on-heap
objects, but we are working on RocksDB-based timers right now.

Best,
Stefan

> Am 12.07.2018 um 14:54 schrieb shishal singh <shishal.a@gmail.com>:
> 
> Thanks Stefan/Stephan/Nico,
> 
> Indeed there are 2 problem. For the 2nd problem ,I am almost certain that explanation
given by Stephan is the true as in my case as there number of timers are in millions. (Each
for different key so I guess coalescing is not an option for me). 
> 
> If I simplify my problem, each day I receive millions of events (10-20M) and I have to
schedule a timer for next day 8 AM to check if matching events are there , if not I have to
send it to Elastic sink as Alert. I suspected that having so many timers fires at same time
could cause my jobs to hang, so I am now scheduling times randomly between (8AM-to 10AM).
But still my job gets hang after some time.  One more thing which I noticed that when my job
gets hang CPU utilization shoot to almost 100%.
> I tried to isolate problem by removing ES sink and just did stream.print() and yet problem
persist. 
> 
> In my current setup, I am running a standalone cluster of 3 machine (All three server
has Task manger, Job manager and Hadoop on it). So I am not using EBS for rocksDB.
> 
>  Also I verified that when jobs gets hang even timers are not being called as I have
debug statement in Timers and only logs I see at that time are following :
> 
> 2018-07-12 14:35:30,423 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Got ping response for sessionid: 0x2648355f7c6010f after 11ms
> 2018-07-12 14:35:31,957 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:36,946 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:41,963 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:43,775 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Got ping response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:35:46,946 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:51,954 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:56,967 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:35:57,127 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Got ping response for sessionid: 0x2648355f7c6010f after 8ms
> 2018-07-12 14:36:01,944 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:36:06,955 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Receiver TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae.
> 2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task             
       - Invoking async call Checkpoint Trigger for Source: Event Source -> filter (1/1)
(d9af2f1da87b7268cc03e152a6179eae). on task Source: Event Source -> filter (1/1)
> 2018-07-12 14:36:10,476 DEBUG org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn
 - Got ping response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:36:11,957 DEBUG org.apache.flink.runtime.taskmanager.TaskManager      
       - Sending heartbeat to JobManager
> 
> As I expected checkpoint also start to fail during this time.
> 
> My Job Graph is pretty much simple : Source<Kafka>-->filter--<process with
times>--->Sink
> 
> 
> Regards,
> Shishal
> 
> 
> On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter <s.richter@data-artisans.com <mailto:s.richter@data-artisans.com>>
wrote:
> Hi,
> 
> adding to what has already been said, I think that here can be two orthogonal problems
here: i) why is your job slowing down/getting stuck? and ii) why is cancellation blocked?
As for ii) I think Stephan already gave to right reason that shutdown could take longer and
that is what gets the TM killed.
> 
> A more interesting question could still be i), why is your job slowing down until shutdown
in the first place. I have two questions here.First, are you running on RocksDB on EBS volumes,
then please have a look at this thread [1] because there can be some performance pitfalls.
Second, how many timers are you expecting, and how are they firing? For example, if you have
a huge amount of timers and the watermark makes a bug jump, there is a possibility that it
takes a while until the job makes progress because it has to handle so many timer callbacks
first. Metrics from even throughput and from your I/O subsystem could be helpful to see if
something is stuck/underperforming or if there is just a lot of timer processing going on.
> 
> Best,
> Stefan 
> 
> [1] https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3CCAKhqdDzAMDqEWiZ5B1QNdqv4+-mTvEfHbHEWrpxftLU7dV9FKw@mail.gmail.com%3E
<https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3CCAKhqdDzAMDqEWiZ5B1QNdqv4+-mTvEfHbHEWrpxftLU7dV9FKw@mail.gmail.com%3E>
> 
>> Am 11.07.2018 um 19:31 schrieb Nico Kruber <nico@data-artisans.com <mailto:nico@data-artisans.com>>:
>> 
>> If this is about too many timers and your application allows it, you may
>> also try to reduce the timer resolution and thus frequency by coalescing
>> them [1].
>> 
>> 
>> Nico
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing
<https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing>
>> 
>> On 11/07/18 18:27, Stephan Ewen wrote:
>>> Hi shishal!
>>> 
>>> I think there is an issue with cancellation when many timers fire at the
>>> same time. These timers have to finish before shutdown happens, this
>>> seems to take a while in your case.
>>> 
>>> Did the TM process actually kill itself in the end (and got restarted)?
>>> 
>>> 
>>> 
>>> On Wed, Jul 11, 2018 at 9:29 AM, shishal <shishal.a@gmail.com <mailto:shishal.a@gmail.com>
>>> <mailto:shishal.a@gmail.com <mailto:shishal.a@gmail.com>>> wrote:
>>> 
>>>    Hi,
>>> 
>>>    I am using flink 1.4.2 with rocksdb as backend. I am using process
>>>    function
>>>    with timer on EventTime.  For checkpointing I am using hdfs.
>>> 
>>>    I am trying load testing so Iam reading kafka from beginning (aprox
>>>    7 days
>>>    data with 50M events).
>>> 
>>>    My job gets stuck after aprox 20 min with no error. There after
>>>    watermark do
>>>    not progress and all checkpoint fails.
>>> 
>>>    Also When I try to cancel my job (using web UI) , it takes several
>>>    minutes
>>>    to finally gets cancelled. Also it makes Task manager down as well.
>>> 
>>>    There is no logs while my job hanged but while cancelling I get
>>>    following
>>>    error.
>>> 
>>>    /
>>> 
>>>    2018-07-11 09:10:39,385 ERROR
>>>    org.apache.flink.runtime.taskmanager.TaskManager              -
>>>    ==============================================================
>>>    ======================      FATAL      =======================
>>>    ==============================================================
>>> 
>>>    A fatal error occurred, forcing the TaskManager to shut down: Task
>>>    'process
>>>    (3/6)' did not react to cancelling signal in the last 30 seconds, but is
>>>    stuck in method:
>>>     org.rocksdb.RocksDB.get(Native Method)
>>>    org.rocksdb.RocksDB.get(RocksDB.java:810)
>>>    org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>>>    org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>    nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>>>    org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>>>    org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>>    org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>>>    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>>    org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>    java.lang.Thread.run(Thread.java:748)
>>> 
>>>    2018-07-11 09:10:39,390 DEBUG
>>>    org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
>>> 
>>>    - Actor was killed. Stopping it now.
>>>    akka.actor.ActorKilledException: Kill
>>>    2018-07-11 09:10:39,407 INFO
>>>    org.apache.flink.runtime.taskmanager.TaskManager              - Stopping
>>>    TaskManager akka://flink/user/taskmanager#-1231617791.
>>>    2018-07-11 09:10:39,408 INFO
>>>    org.apache.flink.runtime.taskmanager.TaskManager              -
>>>    Cancelling
>>>    all computations and discarding all cached data.
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Attempting to fail task externally process (3/6)
>>>    (432fd129f3eea363334521f8c8de5198).
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Task process (3/6) is already in state CANCELING
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Attempting to fail task externally process (4/6)
>>>    (7c6b96c9f32b067bdf8fa7c283eca2e0).
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Task process (4/6) is already in state CANCELING
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Attempting to fail task externally process (2/6)
>>>    (a4f731797a7ea210fd0b512b0263bcd9).
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Task process (2/6) is already in state CANCELING
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Attempting to fail task externally process (1/6)
>>>    (cd8a113779a4c00a051d78ad63bc7963).
>>>    2018-07-11 09:10:39,409 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Task process (1/6) is already in state CANCELING
>>>    2018-07-11 09:10:39,409 INFO
>>>    org.apache.flink.runtime.taskmanager.TaskManager              -
>>>    Disassociating from JobManager
>>>    2018-07-11 09:10:39,412 INFO
>>>    org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
>>>    down BLOB cache
>>>    2018-07-11 09:10:39,431 INFO
>>>    org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>>>    down BLOB cache
>>>    2018-07-11 09:10:39,444 INFO
>>>    org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
>>>    -
>>>    Stopping ZooKeeperLeaderRetrievalService.
>>>    2018-07-11 09:10:39,444 DEBUG
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.disk.iomanager.IOManager
      
>>>      - Shutting
>>>    down I/O manager.
>>>    2018-07-11 09:10:39,451 INFO
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.disk.iomanager.IOManager
      
>>>      - I/O manager
>>>    removed spill file directory
>>>    /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4
>>>    2018-07-11 09:10:39,461 INFO
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment
    
>>>      - Shutting
>>>    down the network environment and its components.
>>>    2018-07-11 09:10:39,461 DEBUG
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment
    
>>>      - Shutting
>>>    down network connection manager
>>>    2018-07-11 09:10:39,462 INFO
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.netty.NettyClient
    
>>>       - Successful
>>>    shutdown (took 1 ms).
>>>    2018-07-11 09:10:39,472 INFO
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.netty.NettyServer
    
>>>       - Successful
>>>    shutdown (took 10 ms).
>>>    2018-07-11 09:10:39,472 DEBUG
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.NetworkEnvironment
    
>>>      - Shutting
>>>    down intermediate result partition manager
>>>    2018-07-11 09:10:39,473 DEBUG
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.partition.ResultPartitionManager

>>>    -
>>>    Releasing 0 partitions because of shutdown.
>>>    2018-07-11 09:10:39,474 DEBUG
>>>    org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>
>>>    <http://org.apache.flink.runtime.io <http://org.apache.flink.runtime.io/>>.network.partition.ResultPartitionManager

>>>    -
>>>    Successful shutdown.
>>>    2018-07-11 09:10:39,498 INFO
>>>    org.apache.flink.runtime.taskmanager.TaskManager              - Task
>>>    manager
>>>    akka://flink/user/taskmanager is completely shut down.
>>>    2018-07-11 09:10:39,504 ERROR
>>>    org.apache.flink.runtime.taskmanager.TaskManager              - Actor
>>>    akka://flink/user/taskmanager#-1231617791 terminated, stopping
>>>    process...
>>>    2018-07-11 09:10:39,563 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Notifying TaskManager about fatal error. Task 'process (2/6)' did not
>>>    react to cancelling signal in the last 30 seconds, but is stuck in
>>>    method:
>>>     org.rocksdb.RocksDB.get(Native Method)
>>>    org.rocksdb.RocksDB.get(RocksDB.java:810)
>>>    org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>>>    org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>    nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>>>    org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>>>    org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>>    org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>>>    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>>    org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>    java.lang.Thread.run(Thread.java:748)
>>>    .
>>>    2018-07-11 09:10:39,575 INFO 
>>>    org.apache.flink.runtime.taskmanager.Task                   
>>>    - Notifying TaskManager about fatal error. Task 'process (1/6)' did not
>>>    react to cancelling signal in the last 30 seconds, but is stuck in
>>>    method:
>>> 
>>>    sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>    java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>    java.lang.Class.newInstance(Class.java:442)
>>>    org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196)
>>>    org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399)
>>>    org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304)
>>>    org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104)
>>>    org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>>    nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>>>    org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>>>    org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>>    org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>>    org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>>    org.apache.flink.streaming.runtime.io <http://org.apache.flink.streaming.runtime.io/>
>>>    <http://runtime.io <http://runtime.io/>>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>>>    org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>>    org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>    java.lang.Thread.run(Thread.java:748)
>>>    /
>>> 
>>> 
>>> 
>>>    --
>>>    Sent from:
>>>    http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>    <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>>
>>> 
>>> 
>> 
>> -- 
>> Nico Kruber | Software Engineer
>> data Artisans
>> 
>> Follow us @dataArtisans
>> --
>> Join Flink Forward - The Apache Flink Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
>> data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>> 
> 


Mime
View raw message