flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashish Pokharel <ashish...@yahoo.com>
Subject Re: IoT Use Case, Problem and Thoughts
Date Thu, 14 Jun 2018 02:18:10 GMT
Hi Fabian,

Thanks for the prompt response and apologies for delayed response. 

You wrapped up the bottom lines pretty well - if I were to wrap it up I’d say “best possible”
recovery on “known" restarts either say manual cancel + start OR framework initiated ones
like on operator failures with these constraints 
 - some data loss is ok
 - avoid periodic checkpoints as states are really transient (less than 5 seconds of lifetime
if not milliseconds) and almost all events make it to state. I do understand that checkpointing
performance has drastically been improved and with async and RocksDB options, it should technically
not add latency in application etc. However, I feel like even with improvements and local
checkpointing (which we already are doing) it is a lot of “unused” IOPS/resource utilization
especially if we start to spin up more apps handling similar data sources and with similar
requirements. On a first blush it feels like those resources are better utilized in cluster
for apps with stricter SLAs for data loss and recovery etc instead.

Basically, I suppose I am thinking Checkpointing feature that is initialized by certain actions
/ events rather than periodic ones. Let me know I am off-base here and I should just enable
checkpointing in all of these apps and move on :) 

I tried Savepoint again and it looks like the issue is caused by the fact that Memory states
are large as it is throwing error states are larger than certain size. So solution of (1)
will possibly solve (2) as well. 

Thanks again,


> On Jun 7, 2018, at 4:25 PM, Fabian Hueske <fhueske@gmail.com> wrote:
> Hi Ashish,
> Thanks for the great write up. 
> If I understood you correctly, there are two different issues that are caused by the
disabled checkpointing.
> 1) Recovery from a failure without restarting all operators to preserve the state in
the running tasks
> 2) Planned restarts an application without losing all state (even with disabled checkpointing).
> Ad 1) The community is constantly working on reducing the time for checkpointing and
> For 1.5, local task recovery was added, which basically stores a state copy on the local
disk which is read in case of a recovery. So, tasks are restarted but don't read the to restore
state from distributed storage but from the local disk.
> AFAIK, this can only be used together with remote checkpoints. I think this might be
an interesting option for you if it would be possible to write checkpoints only to local disk
and not remote storage. AFAIK, there are also other efforts to reduce the number of restarted
tasks in case of a failure. I guess, you've played with other features such as RocksDBStateBackend,
incremental and async checkpoints already. 
> Ad 2) It sounds as if savepoints are exactly the feature your are looking for. It would
be good to know what exactly did not work for you. The MemoryStateBackend is not suitable
for large state sizes because it backups into the heap memory of the JobManager. 
> Best, Fabian
> 2018-06-05 21:57 GMT+02:00 ashish pok <ashishpok@yahoo.com <mailto:ashishpok@yahoo.com>>:
> Fabian, Stephan, All,
> I started a discussion a while back around having a form of event-based checkpointing
policy that will help us in some of our high volume data pipelines. Here is an effort to put
this in front of community and understand what capabilities can support these type of use
cases, how much others feel the same need and potentially a feature that can make it to a
user story.
> Use Case Summary:
> - Extremely high volume of data (events from consumer devices with customer base of over
> - Multiple events need to be combined using a windowing streaming app grouped by keys
(something like 5 min floor of timestamp and unique identifiers for customer devices)
> - "Most" events by a group/key arrive in few seconds if not milliseconds however events
can sometimes delay or get lost in transport (so delayed event handling and timeouts will
be needed)
> - Extremely low (pretty vague but hopefully details below clarify it more) data loss
is acceptable
> - Because of the volume and transient nature of source, checkpointing is turned off (saves
on writes to persistence as states/sessions are active for only few seconds during processing)
> Problem Summary:
> Of course, none of the above is out of the norm for Flink and as a matter of factor we
already have a Flink app doing this. The issue arises when it comes to graceful shutdowns
and on operator failures (eg: Kafka timeouts etc.) On operator failures, entire job graph
restarts which essentially flushes out in-memory states/sessions. I think there is a feature
in works (not sure if it made it to 1.5) to perform selective restarts which will control
the damage but still will result in data loss. Also, it doesn't help when application restarts
are needed. We did try going savepoint route for explicit restart needs but I think MemoryBackedState
ran into issues for larger states or something along those line(not certain). We obviously
cannot recover an operator that actually fails because it's own state could be unrecoverable.
However, it feels like Flink already has a lot of plumbing to help with overall problem of
allowing some sort of recoverable state to handle graceful shutdowns and restarts with minimal
data loss.
> Solutions:
> Some in community commented on my last email with decent ideas like having an event-based
checkpointing trigger (on shutdown, on restart etc) or life-cycle hooks (onCancel, onRestart
etc) in Functions that can be implemented if this type of behavior is needed etc. 
> Appreciate feedback from community on how useful this might be for others and from core
contributors on their thoughts as well.
> Thanks in advance, Ashish

View raw message