flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Henrikson <jehenri...@gmail.com>
Subject Re: Trouble with large state
Date Mon, 22 Jun 2020 18:54:19 GMT
Bhaskar,

I think I am unstuck.  The performance numbers I sent after throttling 
were due to a one character error in business logic.  I think I now have 
something good enough to work with for now.  I will repost if I 
encounter further unexpected issues.

Adding application-level throttling ends up resolving both my symptom of 
slow/failing checkpoints, and also my symptom of crashes during long runs.

Many thanks!


Jeff


On 6/20/20 11:46 AM, Jeff Henrikson wrote:
> Bhaskar,
> 
>  > Glad to know some progress.
> 
> Yeah, some progress.  Yet overnight run didn't look as good as I hoped.
> 
> The throttling required to not crash during snapshots seems to be quite 
> different from the throttling required to crash not during snapshots. So 
> the lowest common denominator is quite a large performance penalty.
> 
> What's worse, the rate of input that makes the snapshot performance go 
> from good to bad seems to change significantly as the state size grows. 
> Here is checkpoint history from an overnight run.
> 
> Parameters:
> 
>      - 30 minutes minimum between snapshots
>      - incremental snapshot mode
>      - inputs throttled to 100 events per sec per input per slot,
>        which is around 1/4 of the unthrottled throughput
> 
> Checkpoint history:
> 
>      ID    Status    Acknowledged    Trigger Time    Latest 
> Acknowledgement    End to End Duration    State Size    Buffered During 
> Alignment
>      12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
   
> 60.5 GB    0 B
>      11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
   
> 53.3 GB    0 B
>      10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
   
> 41.0 GB    0 B
>      9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s   
34.1 
> GB    0 B
>      8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s   
27.8 
> GB    0 B
>      7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s   
23.1 
> GB    0 B
>      6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s   
17.4 
> GB    0 B
>      5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s   
14.3 
> GB    0 B
>      4    COMPLETED    304/304    23:52:29    23:53:26    56s   
12.7 
> GB    0 B
>      3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s   
10.8 
> GB    0 B
>      2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s   
7.40 
> GB    0 B
> 
> As you can see, GB/minute varies drastically.  GB/minute also varies 
> drastically with full checkpoint mode.
> 
> I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
> checkpoint GB/minute getting so slow, it will crash soon.
> 
> I'm really wishing state.backend.async=false worked for 
> RocksDbStateBackend.
> 
> I'm also wondering if my throttler would improve if I just connected to 
> the REST api to ask if any checkpoint is in progress, and then paused 
> inputs accordingly.  Effectively state.backend.async=false via hacked 
> application code.
> 
>  > Where are you updating your state here? I
>  > couldn't find any flink managed state here.
> 
> The only updates to state I make are through the built-in 
> DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
> use .cogroup shows exactly two ways that .cogroup calls an 
> implementation of AppendingState.add.  I summarize those below.
> 
> The two AppendingState subclasses invoked are HeapListState and 
> HeapReducingState.  Neither have a support attribute on them, such as 
> MapState's @PublicEvolving.
> 
>  > I suggested updating the flink managed state using onTimer over an
>  > interval equal to the checkpoint interval.
> 
> So the onTimer method, with interval set to the checkpoint interval. 
> Interesting.
> 
> It looks like the closest subclass for my use case use would be either 
> KeyedCoProcessFunction.  Let me see if I understand concretely the idea:
> 
> 1) between checkpoints, read join input and write join output, by 
> loading any state reads from external state, but buffering all state 
> changes in memory in some kind of data structure.
> 
> 2) whenever a checkpoint arrived or the memory consumed by buffered 
> writes gets too big, flush the writes to state.
> 
> Is that the gist of the idea about .onTimer?
> 
> 
> Jeff
> 
> 
> 
> There are two paths from .coGroup to AppendingState.add
> 
>      path 1 of 2: .coGroup to HeapListState
> 
>          add:90, HeapListState {org.apache.flink.runtime.state.heap}
>          processElement:203, EvictingWindowOperator 
> {org.apache.flink.streaming.runtime.operators.windowing}
>          processElement:164, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
>          processInput:143, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
> 
> 
> org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement

> 
> 
>                (windowAssigner is an instance of GlobalWindows)
> 
>                  @Override
>                  public void processElement(StreamRecord<IN> element)

> throws Exception {
>                      final Collection<W> elementWindows = 
> windowAssigner.assignWindows(
>                              element.getValue(), element.getTimestamp(),

> windowAssignerContext);
> 
>                      //if element is handled by none of assigned 
> elementWindows
>                      boolean isSkippedElement = true;
> 
>                      final K key = 
> this.<K>getKeyedStateBackend().getCurrentKey();
> 
>                      if (windowAssigner instanceof MergingWindowAssigner)
{
>                  . . .
>                      } else {
>                          for (W window : elementWindows) {
> 
>                              // check if the window is already
inactive
>                              if (isWindowLate(window)) {
>                                  continue;
>                              }
>                              isSkippedElement = false;
> 
>                              
> evictingWindowState.setCurrentNamespace(window);
>                              evictingWindowState.add(element);
> 
>          =>
> 
>              org.apache.flink.runtime.state.heap.HeapListState#add:
>                      @Override
>                      public void add(V value) {
>                          Preconditions.checkNotNull(value, "You
cannot 
> add null to a ListState.");
> 
>                          final N namespace = currentNamespace;
> 
>                          final StateTable<K, N, List<V>>
map = stateTable;
>                          List<V> list = map.get(namespace);
> 
>                          if (list == null) {
>                              list = new ArrayList<>();
>                              map.put(namespace, list);
>                          }
>                          list.add(value);
>                      }
> 
>      path 2 of 2: .coGroup to HeapReducingState
> 
>              add:95, HeapReducingState 
> {org.apache.flink.runtime.state.heap}
>              onElement:49, CountTrigger 
> {org.apache.flink.streaming.api.windowing.triggers}
>              onElement:898, WindowOperator$Context 
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:210, EvictingWindowOperator 
> {org.apache.flink.streaming.runtime.operators.windowing}
>              processElement:164, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
>              processInput:143, StreamOneInputProcessor 
> {org.apache.flink.streaming.runtime.io}
> 
>              @Override
>              public void processElement(StreamRecord<IN> element) throws

> Exception {
>                  final Collection<W> elementWindows = 
> windowAssigner.assignWindows(
>                          element.getValue(), element.getTimestamp(),

> windowAssignerContext);
> 
>                  //if element is handled by none of assigned elementWindows
>                  boolean isSkippedElement = true;
> 
>                  final K key = 
> this.<K>getKeyedStateBackend().getCurrentKey();
> 
>                  if (windowAssigner instanceof MergingWindowAssigner)
{
>              . . .
>                  } else {
>                      for (W window : elementWindows) {
> 
>                          // check if the window is already inactive
>                          if (isWindowLate(window)) {
>                              continue;
>                          }
>                          isSkippedElement = false;
> 
>                          evictingWindowState.setCurrentNamespace(window);
>                          evictingWindowState.add(element);
> 
>                          triggerContext.key = key;
>                          triggerContext.window = window;
>                          evictorContext.key = key;
>                          evictorContext.window = window;
> 
>                          TriggerResult triggerResult = 
> triggerContext.onElement(element);
> 
>          =>
>                  public TriggerResult onElement(StreamRecord<IN>

> element) throws Exception {
>                      return trigger.onElement(element.getValue(),

> element.getTimestamp(), window, this);
> 
>          =>
> 
>              @Override
>              public TriggerResult onElement(Object element, long 
> timestamp, W window, TriggerContext ctx) throws Exception {
>                  ReducingState<Long> count = 
> ctx.getPartitionedState(stateDesc);
>                  count.add(1L);
> 
>          =>
> 
>              org.apache.flink.runtime.state.heap.HeapReducingState#add
>                    @Override
>                    public void add(V value) throws IOException {
> 
>                        if (value == null) {
> 
> 
> 
> On 6/19/20 8:22 PM, Vijay Bhaskar wrote:
>> Glad to know some progress. Where are you updating your state here? I 
>> couldn't find any flink managed state here.
>> I suggested updating the flink managed state using onTimer over an 
>> interval equal to the checkpoint interval.
>>
>> In your case since you do throttling, it helped to maintain the fixed 
>> rate per slot. Before the rate was sporadic.
>> It's definitely an IO bottleneck.
>>
>> So now you can think of decoupling stateless scanning and stateful joins.
>> For example you can keep a stateless scan as separate flink job and 
>> keep its output in some Kafka kind of store.
>>
>>  From there you start your stateful joins. This would help focussing 
>> on your stateful job in much better fashion
>>
>> Regards
>> Bhaskar
>>
>>
>>
>>
>> On Sat, Jun 20, 2020 at 4:49 AM Jeff Henrikson <jehenrik27@gmail.com 
>> <mailto:jehenrik27@gmail.com>> wrote:
>>
>>     Bhaskar,
>>
>>     Based on your idea of limiting input to get better checkpoint 
>> behavior,
>>     I made a ProcessFunction that constraints to a number of events per
>>     second per slot per input.  I do need to do some stateless input
>>     scanning before joins.  The stateless part needs to be fast and 
>> does no
>>     impact snapshots.  So I inserted the throttling after the input
>>     preprocessing but before the stateful transformations.  There is a
>>     significant difference of snapshot throughput (often 5x or larger) 
>> when
>>     I change the throttle between 200 and 300 events per second (per slot
>>     per input).
>>
>>     Hope the throttling keeps being effective as I keep the job running
>>     longer.
>>
>>     Odd.  But likely a very effective way out of my problem.
>>
>>     I wonder what drives it . . .  Thread contention?  IOPS contention?
>>
>>     See ProcessFunction code below.
>>
>>     Many thanks!
>>
>>
>>     Jeff
>>
>>
>>
>>     import org.apache.flink.streaming.api.functions.ProcessFunction
>>     import org.apache.flink.util.Collector
>>
>>     // Set eventsPerSecMax to -1 to disable the throttle
>>     // TODO: Actual number of events can be slightly larger
>>     // TODO: Remove pause correlation with system clock
>>
>>     case class Throttler[T](eventsPerSecMax : Double) extends
>>     ProcessFunction[T,T] {
>>         var minutePrev = 0
>>         var numEvents = 0
>>         def minutes() = {
>>           val ms = System.currentTimeMillis()
>>           (ms / 1000 / 60).toInt
>>         }
>>         def increment() = {
>>           val m = minutes()
>>           if(m != minutePrev) {
>>             numEvents = 0
>>           }
>>           numEvents += 1
>>         }
>>         def eps() = {
>>           numEvents/60.0
>>         }
>>         override def processElement(x: T, ctx: ProcessFunction[T,
>>     T]#Context,
>>     out: Collector[T]): Unit = {
>>           increment()
>>           if(eventsPerSecMax > 0 && eps() > eventsPerSecMax)
{
>>             Thread.sleep(1000L)
>>           }
>>           out.collect(x)
>>         }
>>     }
>>
>>     On 6/19/20 9:16 AM, Jeff Henrikson wrote:
>>      > Bhaskar,
>>      >
>>      > Thank you for your thoughtful points.
>>      >
>>      >  > I want to discuss more on points (1) and (2)
>>      >  > If we take care of them  rest will be good
>>      >  >
>>      >  > Coming to (1)
>>      >  >
>>      >  > Please try to give reasonable checkpoint interval time for
>>     every job.
>>      >  > Minum checkpoint interval recommended by flink community is
3
>>     minutes
>>      >  > I thin you should give minimum 3 minutes checkpoint interval
>>     for all
>>      >
>>      > I have spent very little time testing with checkpoint intervals
>>     of under
>>      > 3 minutes.  I frequently test with intervals of 5 minutes and 
>> of 30
>>      > minutes.  I also test with checkpoint intervals such as 60
>>     minutes, and
>>      > never (manual only).  In terms of which exceptions get thrown, I
>>     don't
>>      > see much difference between 5/30/60, I don't see a lot of 
>> difference.
>>      >
>>      > Infinity (no checkpoint internal) seems to be an interesting 
>> value,
>>      > because before crashing, it seems to process around twice as much
>>     state
>>      > as with any finite checkpoint interval.  The largest savepoints I
>>     have
>>      > captured have been manually triggered using the /job/:jobid/stop
>>     REST
>>      > API.  I think it helps for the snapshot to be synchronous.
>>      >
>>      > One curiosity about the /job/:jobid/stop command is that from
>>     time of
>>      > the command, it often takes many minutes for the internal
>>     processing to
>>      > stop.
>>      >
>>      > Another curiosity about /job/:jobid/stop command is that sometimes
>>      > following a completed savepoint, the cluster goes back to running!
>>      >
>>      >  > Coming to (2)
>>      >  >
>>      >  > What's your input data rate?
>>      >
>>      > My application involves what I will call "main" events that are
>>     enriched
>>      > by "secondary" events.  While the secondary events have several
>>      > different input streams, data types, and join keys, I will
>>     estimate the
>>      > secondary events all together.  My estimate for input rate is as
>>     follows:
>>      >
>>      >      50M "main" events
>>      >      50 secondary events for each main event, for a
>>      >          total of around 2.5B input events
>>      >      8 nodes
>>      >      20 hours
>>      >
>>      > Combining these figures, we can estimate:
>>      >
>>      >      50000000*50/8/20/3600 = 4340 events/second/node
>>      >
>>      > I don't see how to act on your advice for (2).  Maybe your idea
>>     is that
>>      > during backfill/bootstrap, I artificially throttle the inputs 
>> to my
>>      > application?
>>      >
>>      > 100% of my application state is due to .cogroup, which manages a
>>      > HeapListState on its own.  I cannot think of any controls for
>>     changing
>>      > how .cogroup handles internal state per se.  I will paste below 
>> the
>>      > Flink code path that .cogroup uses to update its internal state
>>     when it
>>      > runs my application.
>>      >
>>      > The only control I can think of with .cogroup that indirectly
>>     impacts
>>      > internal state is delayed triggering.
>>      >
>>      > Currently I use a trigger on every event, which I understand
>>     creates a
>>      > suboptimal number of events.  I previously experimented with 
>> delayed
>>      > triggering, but I did not get good results.
>>      >
>>      > Just now I tried again ContinuousProcessingTimeTrigger of 30
>>     seconds,
>>      > with rocksdb.timer-service.factory: heap, and a 5 minute 
>> checkpoint
>>      > interval.  The first checkpoint failed, which has been rare when
>>     I use
>>      > all the same parameters except for triggering on every event.  
>> So it
>>      > looks worse not better.
>>      >
>>      > Thanks again,
>>      >
>>      >
>>      > Jeff Henrikson
>>      >
>>      >
>>      >
>>      >
>>      > On 6/18/20 11:21 PM, Vijay Bhaskar wrote:
>>      >> Thanks for the reply.
>>      >> I want to discuss more on points (1) and (2)
>>      >> If we take care of them  rest will be good
>>      >>
>>      >> Coming to (1)
>>      >>
>>      >> Please try to give reasonable checkpoint interval time for every
>>     job.
>>      >> Minum checkpoint interval recommended by flink community is 3
>>     minutes
>>      >> I thin you should give minimum 3 minutes checkpoint interval 
>> for all
>>      >>
>>      >> Coming to (2)
>>      >>
>>      >> What's your input data rate?
>>      >> For example you are seeing data at 100 msg/sec, For each 
>> message if
>>      >> there is state changing and you are updating the state with
>>     RocksDB,
>>      >> it's going to
>>      >> create 100 rows in 1 second at RocksDb end, On the average if 50
>>      >> records have changed each second, even if you are using RocksDB
>>      >> differentialstate = true,
>>      >> there is no use. Because everytime 50% is new rows getting
>>     added. So
>>      >> the best bet is to update records with RocksDB only once in your
>>      >> checkpoint interval.
>>      >> Suppose your checkpoint interval is 5 minutes. If you update
>>     RocksDB
>>      >> state once in 5 minutes, then the rate at which new records
>>     added to
>>      >> RocksDB  will be 1 record/5min.
>>      >> Whereas in your original scenario, 30000 records added to
>>     rocksDB in 5
>>      >> min. You can save 1:30000 ratio of records in addition to 
>> RocksDB.
>>      >> Which will save a huge
>>      >> redundant size addition to RocksDB. Ultimately your  state is
>>     driven
>>      >> by your checkpoint interval. From the input source you will go
>>     back 5
>>      >> min back and read the state, similarly from RocksDB side
>>      >> also you can have a state update once in 5 min should work.
>>     Otherwise
>>      >> even if you add state there is no use.
>>      >>
>>      >> Regards
>>      >> Bhaskar
>>      >>
>>      >> Try to update your RocksDB state in an interval equal to the
>>      >> checkpoint interval. Otherwise in my case many times what's
>>     observed is
>>      >> state size grows unnecessarily.
>>      >>
>>      >> On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson
>>     <jehenrik27@gmail.com <mailto:jehenrik27@gmail.com>
>>      >> <mailto:jehenrik27@gmail.com <mailto:jehenrik27@gmail.com>>>

>> wrote:
>>      >>
>>      >>     Vijay,
>>      >>
>>      >>     Thanks for your thoughts.  Below are answers to your 
>> questions.
>>      >>
>>      >>       > 1. What's your checkpoint interval?
>>      >>
>>      >>     I have used many different checkpoint intervals, ranging

>> from 5
>>      >> minutes
>>      >>     to never.  I usually setMinPasueBetweenCheckpoints to the

>> same
>>      >> value as
>>      >>     the checkpoint interval.
>>      >>
>>      >>       > 2. How frequently are you updating the state into

>> RocksDB?
>>      >>
>>      >>     My understanding is that for .cogroup:
>>      >>
>>      >>         - Triggers control communication outside the operator
>>      >>         - Evictors control cleanup of internal state
>>      >>         - Configurations like write buffer size control the
>>     frequency of
>>      >>     state change at the storage layer
>>      >>         - There is no control for how frequently the window
state
>>      >>     updates at
>>      >>     the layer of the RocksDB api layer.
>>      >>
>>      >>     Thus, the state update whenever data is ingested.
>>      >>
>>      >>       > 3. How many task managers are you using?
>>      >>
>>      >>     Usually I have been running with one slot per taskmanager.

>>     28GB of
>>      >>     usable ram on each node.
>>      >>
>>      >>       > 4. How much data each task manager handles while

>> taking the
>>      >>     checkpoint?
>>      >>
>>      >>     Funny you should ask.  I would be okay with zero.
>>      >>
>>      >>     The application I am replacing has a latency of 36-48 hours,
>>     so if I
>>      >>     had
>>      >>     to fully stop processing to take every snapshot
>>     synchronously, it
>>      >> might
>>      >>     be seen as totally acceptable, especially for initial
>>     bootstrap.
>>      >> Also,
>>      >>     the velocity of running this backfill is approximately 
>> 115x real
>>      >>     time on
>>      >>     8 nodes, so the steady-state run may not exhibit the failure
>>     mode in
>>      >>     question at all.
>>      >>
>>      >>     It has come as some frustration to me that, in the case
of
>>      >>     RocksDBStateBackend, the configuration key 
>> state.backend.async
>>      >>     effectively has no meaningful way to be false.
>>      >>
>>      >>     The only way I have found in the existing code to get a
>>     behavior like
>>      >>     synchronous snapshot is to POST to /jobs/<jobID>/stop
with
>>      >> drain=false
>>      >>     and a URL.  This method of failing fast is the way that
I
>>     discovered
>>      >>     that I needed to increase transfer threads from the default.
>>      >>
>>      >>     The reason I don't just run the whole backfill and then

>> take one
>>      >>     snapshot is that even in the absence of checkpoints, a very
>>     similar
>>      >>     congestion seems to take the cluster down when I am say
>>     20-30% of the
>>      >>     way through my backfill.
>>      >>
>>      >>     Reloading from my largest feasible snapshot makes it
>>     possible to make
>>      >>     another snapshot a bit larger before crash, but not by much.
>>      >>
>>      >>     On first glance, the code change to allow
>>     RocksDBStateBackend into a
>>      >>     synchronous snapshots mode looks pretty easy.  Nevertheless,
>>     I was
>>      >>     hoping to do the initial launch of my application without
>>     needing to
>>      >>     modify the framework.
>>      >>
>>      >>     Regards,
>>      >>
>>      >>
>>      >>     Jeff Henrikson
>>      >>
>>      >>
>>      >>     On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
>>      >>      > For me this seems to be an IO bottleneck at your
task
>>     manager.
>>      >>      > I have a couple of queries:
>>      >>      > 1. What's your checkpoint interval?
>>      >>      > 2. How frequently are you updating the state into
RocksDB?
>>      >>      > 3. How many task managers are you using?
>>      >>      > 4. How much data each task manager handles while
taking 
>> the
>>      >>     checkpoint?
>>      >>      >
>>      >>      > For points (3) and (4) , you should be very careful.
I
>>     feel you
>>      >> are
>>      >>      > stuck at this.
>>      >>      > You try to scale vertically by increasing more CPU
and
>>     memory for
>>      >>     each
>>      >>      > task manager.
>>      >>      > If not, try to scale horizontally so that each task
>>     manager IO
>>      >>     gets reduces
>>      >>      > Apart from that check is there any bottleneck with
the 
>> file
>>      >> system.
>>      >>      >
>>      >>      > Regards
>>      >>      > Bhaskar
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      > On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor
>>     <victtim@gmail.com <mailto:victtim@gmail.com>
>>      >>     <mailto:victtim@gmail.com <mailto:victtim@gmail.com>>
>>      >>      > <mailto:victtim@gmail.com <mailto:victtim@gmail.com>
>>     <mailto:victtim@gmail.com <mailto:victtim@gmail.com>>>>
wrote:
>>      >>      >
>>      >>      >     I had a similar problem.   I ended up solving
by not
>>      >> relying on
>>      >>      >     checkpoints for recovery and instead re-read
my input
>>     sources
>>      >>     (in my
>>      >>      >     case a kafka topic) from the earliest offset
and
>>     rebuilding
>>      >>     only the
>>      >>      >     state I need.  I only need to care about
the past 1 
>> to 2
>>      >> days of
>>      >>      >     state so can afford to drop anything older. 
 My 
>> recovery
>>      >>     time went
>>      >>      >     from over an hour for just the first checkpoint
to
>>     under 10
>>      >>     minutes.
>>      >>      >
>>      >>      >     Tim
>>      >>      >
>>      >>      >     On Wed, Jun 17, 2020, 11:52 PM Yun Tang
>>     <myasuka@live.com <mailto:myasuka@live.com>
>>      >>     <mailto:myasuka@live.com <mailto:myasuka@live.com>>
>>      >>      >     <mailto:myasuka@live.com <mailto:myasuka@live.com>
>>     <mailto:myasuka@live.com <mailto:myasuka@live.com>>>> wrote:
>>      >>      >
>>      >>      >         Hi Jeff
>>      >>      >
>>      >>      >          1. "after around 50GB of state, I stop
being 
>> able to
>>      >>     reliably
>>      >>      >             take checkpoints or savepoints.
"
>>      >>      >             What is the exact reason that
job cannot 
>> complete
>>      >>      >             checkpoint? Expired before completing
or
>>     decline by
>>      >> some
>>      >>      >             tasks? The former one is manly
caused by high
>>      >>     back-pressure
>>      >>      >             and the later one is mainly due
to some 
>> internal
>>      >> error.
>>      >>      >          2. Have you checked what reason the
remote task
>>     manager
>>      >>     is lost?
>>      >>      >             If the remote task manager is
not crashed, it
>>     might
>>      >>     be due
>>      >>      >             to GC impact, I think you might
need to check
>>      >>     task-manager
>>      >>      >             logs and GC logs.
>>      >>      >
>>      >>      >         Best
>>      >>      >         Yun Tang
>>      >>      >
>>      >>
>>      
>>  ------------------------------------------------------------------------
>>      >>      >         *From:* Jeff Henrikson <jehenrik27@gmail.com
>>     <mailto:jehenrik27@gmail.com>
>>      >>     <mailto:jehenrik27@gmail.com <mailto:jehenrik27@gmail.com>>
>>      >>      >         <mailto:jehenrik27@gmail.com
>>     <mailto:jehenrik27@gmail.com>
>>      >> <mailto:jehenrik27@gmail.com <mailto:jehenrik27@gmail.com>>>>
>>      >>      >         *Sent:* Thursday, June 18, 2020 1:46
>>      >>      >         *To:* user <user@flink.apache.org
>>     <mailto:user@flink.apache.org>
>>      >>     <mailto:user@flink.apache.org
>>     <mailto:user@flink.apache.org>> <mailto:user@flink.apache.org
>>     <mailto:user@flink.apache.org>
>>      >>     <mailto:user@flink.apache.org 
>> <mailto:user@flink.apache.org>>>>
>>      >>      >         *Subject:* Trouble with large state
>>      >>      >         Hello Flink users,
>>      >>      >
>>      >>      >         I have an application of around 10 enrichment
>>     joins.  All
>>      >>     events
>>      >>      >         are
>>      >>      >         read from kafka and have event timestamps. 
The
>>     joins are
>>      >>     built
>>      >>      >         using
>>      >>      >         .cogroup, with a global window, triggering
on 
>> every 1
>>      >>     event, plus a
>>      >>      >         custom evictor that drops records once
a newer
>>     record
>>      >> for the
>>      >>      >         same ID
>>      >>      >         has been processed.  Deletes are represented
by 
>> empty
>>      >>     events with
>>      >>      >         timestamp and ID (tombstones). That
way, we can 
>> drop
>>      >>     records when
>>      >>      >         business logic dictates, as opposed
to when a 
>> maximum
>>      >>     retention
>>      >>      >         has been
>>      >>      >         attained.  The application runs
>>     RocksDBStateBackend, on
>>      >>      >         Kubernetes on
>>      >>      >         AWS with local SSDs.
>>      >>      >
>>      >>      >         Unit tests show that the joins produce
expected
>>      >> results.     On an
>>      >>      >         8 node
>>      >>      >         cluster, watermark output progress seems
to
>>     indicate I
>>      >>     should be
>>      >>      >         able to
>>      >>      >         bootstrap my state of around 500GB in
around 1
>>     day.  I am
>>      >>     able
>>      >>      >         to save
>>      >>      >         and restore savepoints for the first
half an hour
>>     of run
>>      >>     time.
>>      >>      >
>>      >>      >         My current trouble is that after around
50GB of
>>     state,
>>      >> I stop
>>      >>      >         being able
>>      >>      >         to reliably take checkpoints or savepoints. 
Some
>>     time
>>      >> after
>>      >>      >         that, I
>>      >>      >         start getting a variety of failures
where the 
>> first
>>      >>     suspicious
>>      >>      >         log event
>>      >>      >         is a generic cluster connectivity error,
such as:
>>      >>      >
>>      >>      >               1) java.io.IOException: Connecting
the 
>> channel
>>      >> failed:
>>      >>      >         Connecting
>>      >>      >               to remote task manager +
>>     '/10.67.7.101:38955 <http://10.67.7.101:38955>
>>      >>     <http://10.67.7.101:38955>
>>      >>      >         <http://10.67.7.101:38955>' has
failed. This
>>      >>      >               might indicate that the remote
task 
>> manager has
>>      >>     been lost.
>>      >>      >
>>      >>      >               2) org.apache.flink.runtime.io
>>     <http://org.apache.flink.runtime.io>
>>      >>     <http://org.apache.flink.runtime.io>.network.netty.exception
>>      >>      >               .RemoteTransportException:
Connection
>>     unexpectedly
>>      >>     closed
>>      >>      >         by remote
>>      >>      >               task manager 'null'. This
might indicate
>>     that the
>>      >>     remote task
>>      >>      >               manager was lost.
>>      >>      >
>>      >>      >               3) Association with remote
system
>>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >>     <http://flink@10.67.6.66:34987>
>>      >>      >         <http://flink@10.67.6.66:34987>]
has failed,
>>     address is
>>      >> now
>>      >>      >               gated for [50] ms. Reason:
[Association
>>     failed with
>>      >>      >               [akka.tcp://flink@10.67.6.66:34987
>>     <http://flink@10.67.6.66:34987>
>>      >>     <http://flink@10.67.6.66:34987>
>>      >>      >         <http://flink@10.67.6.66:34987>]]
Caused by:
>>      >>      >               [java.net <http://java.net>
>>     <http://java.net>.NoRouteToHostException:
>>      >>     No route to host]
>>      >>      >
>>      >>      >         I don't see any obvious out of memory
errors on 
>> the
>>      >>     TaskManager UI.
>>      >>      >
>>      >>      >         Adding nodes to the cluster does not
seem to
>>     increase the
>>      >>     maximum
>>      >>      >         savable state size.
>>      >>      >
>>      >>      >         I could enable HA, but for the time
being I 
>> have been
>>      >>     leaving it
>>      >>      >         out to
>>      >>      >         avoid the possibility of masking deterministic
>>     faults.
>>      >>      >
>>      >>      >         Below are my configurations.
>>      >>      >
>>      >>      >         Thanks in advance for any advice.
>>      >>      >
>>      >>      >         Regards,
>>      >>      >
>>      >>      >
>>      >>      >         Jeff Henrikson
>>      >>      >
>>      >>      >
>>      >>      >
>>      >>      >         Flink version: 1.10
>>      >>      >
>>      >>      >         Configuration set via code:
>>      >>      >               parallelism=8
>>      >>      >               maxParallelism=64
>>      >>      >     
>> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>      >>      >
>>      >> setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>      >>      >               setTolerableCheckpointFailureNumber(1000)
>>      >>      >               setMaxConcurrentCheckpoints(1)
>>      >>      >
>>      >>      >
>>      >>
>>      
>>  enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

>>
>>      >>
>>      >>      >               RocksDBStateBackend
>>      >>      >     
>> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>>      >>      >               setNumberOfTransferThreads(25)
>>      >>      >               setDbStoragePath points to
a local nvme SSD
>>      >>      >
>>      >>      >         Configuration in flink-conf.yaml:
>>      >>      >
>>      >>      >               jobmanager.rpc.address: localhost
>>      >>      >               jobmanager.rpc.port: 6123
>>      >>      >               jobmanager.heap.size: 28000m
>>      >>      >               taskmanager.memory.process.size:
28000m
>>      >>      >               taskmanager.memory.jvm-metaspace.size:
512m
>>      >>      >               taskmanager.numberOfTaskSlots:
1
>>      >>      >               parallelism.default: 1
>>      >>      >               jobmanager.execution.failover-strategy:
full
>>      >>      >
>>      >>      >               cluster.evenly-spread-out-slots:
false
>>      >>      >
>>      >>      >               taskmanager.memory.network.fraction:
>>     0.2           #
>>      >>      >         default 0.1
>>      >>      >               
>> taskmanager.memory.framework.off-heap.size: 2GB
>>      >>      >               taskmanager.memory.task.off-heap.size:
2GB
>>      >>      >     taskmanager.network.memory.buffers-per-channel:
32
>>      >>     # default 2
>>      >>      >               taskmanager.memory.managed.fraction:
0.4 
>>     #
>>      >> docs say
>>      >>      >         default 0.1, but something seems to
set 0.4
>>      >>      >               taskmanager.memory.task.off-heap.size:
>>     2048MB      #
>>      >>      >         default 128M
>>      >>      >
>>      >>      >               state.backend.fs.memory-threshold:
1048576
>>      >>      >               state.backend.fs.write-buffer-size:
10240000
>>      >>      >               state.backend.local-recovery:
true
>>      >>      >               state.backend.rocksdb.writebuffer.size:
64MB
>>      >>      >               state.backend.rocksdb.writebuffer.count:
8
>>      >>      >     state.backend.rocksdb.writebuffer.number-to-merge:
4
>>      >>      >     state.backend.rocksdb.timer-service.factory:
heap
>>      >>      >               state.backend.rocksdb.block.cache-size:
>>     64000000 #
>>      >>     default 8MB
>>      >>      >               state.backend.rocksdb.write-batch-size:
>>     16000000 #
>>      >>     default 2MB
>>      >>      >
>>      >>      >               web.checkpoints.history: 250
>>      >>      >
>>      >>
>>


Mime
View raw message