flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Henrikson <jehenri...@gmail.com>
Subject Re: Trouble with large state
Date Fri, 19 Jun 2020 23:19:06 GMT
Bhaskar,

Based on your idea of limiting input to get better checkpoint behavior, 
I made a ProcessFunction that constraints to a number of events per 
second per slot per input.  I do need to do some stateless input 
scanning before joins.  The stateless part needs to be fast and does no 
impact snapshots.  So I inserted the throttling after the input 
preprocessing but before the stateful transformations.  There is a 
significant difference of snapshot throughput (often 5x or larger) when 
I change the throttle between 200 and 300 events per second (per slot 
per input).

Hope the throttling keeps being effective as I keep the job running longer.

Odd.  But likely a very effective way out of my problem.

I wonder what drives it . . .  Thread contention?  IOPS contention?

See ProcessFunction code below.

Many thanks!


Jeff



import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

// Set eventsPerSecMax to -1 to disable the throttle
// TODO: Actual number of events can be slightly larger
// TODO: Remove pause correlation with system clock

case class Throttler[T](eventsPerSecMax : Double) extends 
ProcessFunction[T,T] {
   var minutePrev = 0
   var numEvents = 0
   def minutes() = {
     val ms = System.currentTimeMillis()
     (ms / 1000 / 60).toInt
   }
   def increment() = {
     val m = minutes()
     if(m != minutePrev) {
       numEvents = 0
     }
     numEvents += 1
   }
   def eps() = {
     numEvents/60.0
   }
   override def processElement(x: T, ctx: ProcessFunction[T, T]#Context, 
out: Collector[T]): Unit = {
     increment()
     if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
       Thread.sleep(1000L)
     }
     out.collect(x)
   }
}

On 6/19/20 9:16 AM, Jeff Henrikson wrote:
> Bhaskar,
> 
> Thank you for your thoughtful points.
> 
>  > I want to discuss more on points (1) and (2)
>  > If we take care of them  rest will be good
>  >
>  > Coming to (1)
>  >
>  > Please try to give reasonable checkpoint interval time for every job.
>  > Minum checkpoint interval recommended by flink community is 3 minutes
>  > I thin you should give minimum 3 minutes checkpoint interval for all
> 
> I have spent very little time testing with checkpoint intervals of under 
> 3 minutes.  I frequently test with intervals of 5 minutes and of 30 
> minutes.  I also test with checkpoint intervals such as 60 minutes, and 
> never (manual only).  In terms of which exceptions get thrown, I don't 
> see much difference between 5/30/60, I don't see a lot of difference.
> 
> Infinity (no checkpoint internal) seems to be an interesting value, 
> because before crashing, it seems to process around twice as much state 
> as with any finite checkpoint interval.  The largest savepoints I have 
> captured have been manually triggered using the /job/:jobid/stop REST 
> API.  I think it helps for the snapshot to be synchronous.
> 
> One curiosity about the /job/:jobid/stop command is that from time of 
> the command, it often takes many minutes for the internal processing to 
> stop.
> 
> Another curiosity about /job/:jobid/stop command is that sometimes 
> following a completed savepoint, the cluster goes back to running!
> 
>  > Coming to (2)
>  >
>  > What's your input data rate?
> 
> My application involves what I will call "main" events that are enriched 
> by "secondary" events.  While the secondary events have several 
> different input streams, data types, and join keys, I will estimate the 
> secondary events all together.  My estimate for input rate is as follows:
> 
>      50M "main" events
>      50 secondary events for each main event, for a
>          total of around 2.5B input events
>      8 nodes
>      20 hours
> 
> Combining these figures, we can estimate:
> 
>      50000000*50/8/20/3600 = 4340 events/second/node
> 
> I don't see how to act on your advice for (2).  Maybe your idea is that 
> during backfill/bootstrap, I artificially throttle the inputs to my 
> application?
> 
> 100% of my application state is due to .cogroup, which manages a 
> HeapListState on its own.  I cannot think of any controls for changing 
> how .cogroup handles internal state per se.  I will paste below the 
> Flink code path that .cogroup uses to update its internal state when it 
> runs my application.
> 
> The only control I can think of with .cogroup that indirectly impacts 
> internal state is delayed triggering.
> 
> Currently I use a trigger on every event, which I understand creates a 
> suboptimal number of events.  I previously experimented with delayed 
> triggering, but I did not get good results.
> 
> Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
> with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
> interval.  The first checkpoint failed, which has been rare when I use 
> all the same parameters except for triggering on every event.  So it 
> looks worse not better.
> 
> Thanks again,
> 
> 
> Jeff Henrikson
> 
> 
> 
> 
> On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>> Thanks for the reply.
>> I want to discuss more on points (1) and (2)
>> If we take care of them  rest will be good
>>
>> Coming to (1)
>>
>> Please try to give reasonable checkpoint interval time for every job. 
>> Minum checkpoint interval recommended by flink community is 3 minutes
>> I thin you should give minimum 3 minutes checkpoint interval for all
>>
>> Coming to (2)
>>
>> What's your input data rate?
>> For example you are seeing data at 100 msg/sec, For each message if 
>> there is state changing and you are updating the state with RocksDB, 
>> it's going to
>> create 100 rows in 1 second at RocksDb end, On the average if 50 
>> records have changed each second, even if you are using RocksDB 
>> differentialstate = true,
>> there is no use. Because everytime 50% is new rows getting added. So 
>> the best bet is to update records with RocksDB only once in your 
>> checkpoint interval.
>> Suppose your checkpoint interval is 5 minutes. If you update RocksDB 
>> state once in 5 minutes, then the rate at which new records added to 
>> RocksDB  will be 1 record/5min.
>> Whereas in your original scenario, 30000 records added to rocksDB in 5 
>> min. You can save 1:30000 ratio of records in addition to RocksDB. 
>> Which will save a huge
>> redundant size addition to RocksDB. Ultimately your  state is driven 
>> by your checkpoint interval. From the input source you will go back 5 
>> min back and read the state, similarly from RocksDB side
>> also you can have a state update once in 5 min should work. Otherwise 
>> even if you add state there is no use.
>>
>> Regards
>> Bhaskar
>>
>> Try to update your RocksDB state in an interval equal to the 
>> checkpoint interval. Otherwise in my case many times what's observed is
>> state size grows unnecessarily.
>>
>> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <jehenrik27@gmail.com 
>> <mailto:jehenrik27@gmail.com>> wrote:
>>
>>     Vijay,
>>
>>     Thanks for your thoughts.  Below are answers to your questions.
>>
>>       > 1. What's your checkpoint interval?
>>
>>     I have used many different checkpoint intervals, ranging from 5 
>> minutes
>>     to never.  I usually setMinPasueBetweenCheckpoints to the same 
>> value as
>>     the checkpoint interval.
>>
>>       > 2. How frequently are you updating the state into RocksDB?
>>
>>     My understanding is that for .cogroup:
>>
>>         - Triggers control communication outside the operator
>>         - Evictors control cleanup of internal state
>>         - Configurations like write buffer size control the frequency of
>>     state change at the storage layer
>>         - There is no control for how frequently the window state
>>     updates at
>>     the layer of the RocksDB api layer.
>>
>>     Thus, the state update whenever data is ingested.
>>
>>       > 3. How many task managers are you using?
>>
>>     Usually I have been running with one slot per taskmanager.  28GB of
>>     usable ram on each node.
>>
>>       > 4. How much data each task manager handles while taking the
>>     checkpoint?
>>
>>     Funny you should ask.  I would be okay with zero.
>>
>>     The application I am replacing has a latency of 36-48 hours, so if I
>>     had
>>     to fully stop processing to take every snapshot synchronously, it 
>> might
>>     be seen as totally acceptable, especially for initial bootstrap.  
>> Also,
>>     the velocity of running this backfill is approximately 115x real
>>     time on
>>     8 nodes, so the steady-state run may not exhibit the failure mode in
>>     question at all.
>>
>>     It has come as some frustration to me that, in the case of
>>     RocksDBStateBackend, the configuration key state.backend.async
>>     effectively has no meaningful way to be false.
>>
>>     The only way I have found in the existing code to get a behavior like
>>     synchronous snapshot is to POST to /jobs/<jobID>/stop with 
>> drain=false
>>     and a URL.  This method of failing fast is the way that I discovered
>>     that I needed to increase transfer threads from the default.
>>
>>     The reason I don't just run the whole backfill and then take one
>>     snapshot is that even in the absence of checkpoints, a very similar
>>     congestion seems to take the cluster down when I am say 20-30% of the
>>     way through my backfill.
>>
>>     Reloading from my largest feasible snapshot makes it possible to make
>>     another snapshot a bit larger before crash, but not by much.
>>
>>     On first glance, the code change to allow RocksDBStateBackend into a
>>     synchronous snapshots mode looks pretty easy.  Nevertheless, I was
>>     hoping to do the initial launch of my application without needing to
>>     modify the framework.
>>
>>     Regards,
>>
>>
>>     Jeff Henrikson
>>
>>
>>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>>      > For me this seems to be an IO bottleneck at your task manager.
>>      > I have a couple of queries:
>>      > 1. What's your checkpoint interval?
>>      > 2. How frequently are you updating the state into RocksDB?
>>      > 3. How many task managers are you using?
>>      > 4. How much data each task manager handles while taking the
>>     checkpoint?
>>      >
>>      > For points (3) and (4) , you should be very careful. I feel you 
>> are
>>      > stuck at this.
>>      > You try to scale vertically by increasing more CPU and memory for
>>     each
>>      > task manager.
>>      > If not, try to scale horizontally so that each task manager IO
>>     gets reduces
>>      > Apart from that check is there any bottleneck with the file 
>> system.
>>      >
>>      > Regards
>>      > Bhaskar
>>      >
>>      >
>>      >
>>      >
>>      >
>>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <victtim@gmail.com
>>     <mailto:victtim@gmail.com>
>>      > <mailto:victtim@gmail.com <mailto:victtim@gmail.com>>>
wrote:
>>      >
>>      >     I had a similar problem.   I ended up solving by not 
>> relying on
>>      >     checkpoints for recovery and instead re-read my input sources
>>     (in my
>>      >     case a kafka topic) from the earliest offset and rebuilding
>>     only the
>>      >     state I need.  I only need to care about the past 1 to 2 
>> days of
>>      >     state so can afford to drop anything older.   My recovery
>>     time went
>>      >     from over an hour for just the first checkpoint to under 10
>>     minutes.
>>      >
>>      >     Tim
>>      >
>>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myasuka@live.com
>>     <mailto:myasuka@live.com>
>>      >     <mailto:myasuka@live.com <mailto:myasuka@live.com>>>
wrote:
>>      >
>>      >         Hi Jeff
>>      >
>>      >          1. "after around 50GB of state, I stop being able to
>>     reliably
>>      >             take checkpoints or savepoints. "
>>      >             What is the exact reason that job cannot complete
>>      >             checkpoint? Expired before completing or decline
by 
>> some
>>      >             tasks? The former one is manly caused by high
>>     back-pressure
>>      >             and the later one is mainly due to some internal

>> error.
>>      >          2. Have you checked what reason the remote task manager
>>     is lost?
>>      >             If the remote task manager is not crashed, it might
>>     be due
>>      >             to GC impact, I think you might need to check
>>     task-manager
>>      >             logs and GC logs.
>>      >
>>      >         Best
>>      >         Yun Tang
>>      >      
>>  ------------------------------------------------------------------------
>>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>>     <mailto:jehenrik27@gmail.com>
>>      >         <mailto:jehenrik27@gmail.com 
>> <mailto:jehenrik27@gmail.com>>>
>>      >         *Sent:* Thursday, June 18, 2020 1:46
>>      >         *To:* user <user@flink.apache.org
>>     <mailto:user@flink.apache.org> <mailto:user@flink.apache.org
>>     <mailto:user@flink.apache.org>>>
>>      >         *Subject:* Trouble with large state
>>      >         Hello Flink users,
>>      >
>>      >         I have an application of around 10 enrichment joins. 
All
>>     events
>>      >         are
>>      >         read from kafka and have event timestamps.  The joins
are
>>     built
>>      >         using
>>      >         .cogroup, with a global window, triggering on every 1
>>     event, plus a
>>      >         custom evictor that drops records once a newer record

>> for the
>>      >         same ID
>>      >         has been processed.  Deletes are represented by empty
>>     events with
>>      >         timestamp and ID (tombstones). That way, we can drop
>>     records when
>>      >         business logic dictates, as opposed to when a maximum
>>     retention
>>      >         has been
>>      >         attained.  The application runs RocksDBStateBackend,
on
>>      >         Kubernetes on
>>      >         AWS with local SSDs.
>>      >
>>      >         Unit tests show that the joins produce expected 
>> results.     On an
>>      >         8 node
>>      >         cluster, watermark output progress seems to indicate I
>>     should be
>>      >         able to
>>      >         bootstrap my state of around 500GB in around 1 day. 
I am
>>     able
>>      >         to save
>>      >         and restore savepoints for the first half an hour of run
>>     time.
>>      >
>>      >         My current trouble is that after around 50GB of state,

>> I stop
>>      >         being able
>>      >         to reliably take checkpoints or savepoints.  Some time

>> after
>>      >         that, I
>>      >         start getting a variety of failures where the first
>>     suspicious
>>      >         log event
>>      >         is a generic cluster connectivity error, such as:
>>      >
>>      >               1) java.io.IOException: Connecting the channel

>> failed:
>>      >         Connecting
>>      >               to remote task manager + '/10.67.7.101:38955
>>     <http://10.67.7.101:38955>
>>      >         <http://10.67.7.101:38955>' has failed. This
>>      >               might indicate that the remote task manager
has
>>     been lost.
>>      >
>>      >               2) org.apache.flink.runtime.io
>>     <http://org.apache.flink.runtime.io>.network.netty.exception
>>      >               .RemoteTransportException: Connection unexpectedly
>>     closed
>>      >         by remote
>>      >               task manager 'null'. This might indicate that
the
>>     remote task
>>      >               manager was lost.
>>      >
>>      >               3) Association with remote system
>>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >         <http://flink@10.67.6.66:34987>] has failed, address
is 
>> now
>>      >               gated for [50] ms. Reason: [Association failed
with
>>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >         <http://flink@10.67.6.66:34987>]] Caused by:
>>      >               [java.net <http://java.net>.NoRouteToHostException:
>>     No route to host]
>>      >
>>      >         I don't see any obvious out of memory errors on the
>>     TaskManager UI.
>>      >
>>      >         Adding nodes to the cluster does not seem to increase
the
>>     maximum
>>      >         savable state size.
>>      >
>>      >         I could enable HA, but for the time being I have been
>>     leaving it
>>      >         out to
>>      >         avoid the possibility of masking deterministic faults.
>>      >
>>      >         Below are my configurations.
>>      >
>>      >         Thanks in advance for any advice.
>>      >
>>      >         Regards,
>>      >
>>      >
>>      >         Jeff Henrikson
>>      >
>>      >
>>      >
>>      >         Flink version: 1.10
>>      >
>>      >         Configuration set via code:
>>      >               parallelism=8
>>      >               maxParallelism=64
>>      >     setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      >               
>> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      >               setTolerableCheckpointFailureNumber(1000)
>>      >               setMaxConcurrentCheckpoints(1)
>>      >
>>      >      
>>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

>>
>>      >               RocksDBStateBackend
>>      >     setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      >               setNumberOfTransferThreads(25)
>>      >               setDbStoragePath points to a local nvme SSD
>>      >
>>      >         Configuration in flink-conf.yaml:
>>      >
>>      >               jobmanager.rpc.address: localhost
>>      >               jobmanager.rpc.port: 6123
>>      >               jobmanager.heap.size: 28000m
>>      >               taskmanager.memory.process.size: 28000m
>>      >               taskmanager.memory.jvm-metaspace.size: 512m
>>      >               taskmanager.numberOfTaskSlots: 1
>>      >               parallelism.default: 1
>>      >               jobmanager.execution.failover-strategy: full
>>      >
>>      >               cluster.evenly-spread-out-slots: false
>>      >
>>      >               taskmanager.memory.network.fraction: 0.2          
#
>>      >         default 0.1
>>      >               taskmanager.memory.framework.off-heap.size:
2GB
>>      >               taskmanager.memory.task.off-heap.size: 2GB
>>      >               taskmanager.network.memory.buffers-per-channel:
32
>>     # default 2
>>      >               taskmanager.memory.managed.fraction: 0.4    
# 
>> docs say
>>      >         default 0.1, but something seems to set 0.4
>>      >               taskmanager.memory.task.off-heap.size: 2048MB     
#
>>      >         default 128M
>>      >
>>      >               state.backend.fs.memory-threshold: 1048576
>>      >               state.backend.fs.write-buffer-size: 10240000
>>      >               state.backend.local-recovery: true
>>      >               state.backend.rocksdb.writebuffer.size: 64MB
>>      >               state.backend.rocksdb.writebuffer.count: 8
>>      >               state.backend.rocksdb.writebuffer.number-to-merge:
4
>>      >               state.backend.rocksdb.timer-service.factory:
heap
>>      >               state.backend.rocksdb.block.cache-size: 64000000
#
>>     default 8MB
>>      >               state.backend.rocksdb.write-batch-size: 16000000
#
>>     default 2MB
>>      >
>>      >               web.checkpoints.history: 250
>>      >
>>

Mime
View raw message