flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Operational concerns with state (was Re: Window limitations on groupBy)
Date Thu, 19 Jan 2017 20:36:32 GMT
Hi Raman,

Checkpoints are used to recover from task or process failures and usually
automatically taken at periodic intervals if configured correctly.
Checkpoints are usually removed when a more recent checkpoint is completed
(the exact policy can be configured).

Savepoints are used to restart a job that was previously shutdown, to
migrate a job to another cluster (e.g., when upgrading Flink), updating the
job itself etc. So more for planned maintenance.
Nonetheless they can also be used for more coarse-grained fault tolerance
and it is a common practice to periodically trigger a savepoint.

These blog posts might be helpful to understand the potential of savepoints
[1] [2].

Best, Fabian

[1] http://data-artisans.com/turning-back-time-savepoints/
[2] http://data-artisans.com/savepoints-part-2-updating-applications/

2017-01-19 19:02 GMT+01:00 Raman Gupta <rocketraman@gmail.com>:

> I was able to get it working well with the original approach you
> described. Thanks! Note that the documentation on how to do this with the
> Java API is... sparse, to say the least. I was able to look at the
> implementation of the scala flatMapWithState function as a starting point.
>
> Now I'm trying to understand all the operational concerns related to the
> stored state. My checkpoints are in rocksdb configured via the job
> definition.
>
> It seems that the checkpointed state of the streaming job is lost when I
> stop and restart flink normally, or Flink terminates abnormally and is
> restarted. I was able to take an explicit savepoint and then restart the
> job with it.
>
> Is the correct approach as of now to take savepoints periodically via
> cron, and use those to re-run jobs in case of flink failure or restart?
>
> Regards,
> Raman
>
> On 19/01/17 05:43 AM, Fabian Hueske wrote:
>
> Hi Raman,
>
> I think you would need a sliding count window of size 2 with slide 1.
> This is basically a GlobalWindow with a special trigger.
>
> However, you would need to modify the custom trigger to be able to
> - identify a terminal event (if there is such a thing) or to
> - close the window after a certain period of inactivity to clean up the
> state.
>
> Best, Fabian
>
> 2017-01-19 1:43 GMT+01:00 Raman Gupta <rocketraman@gmail.com>:
>
>> Thank you for your reply.
>>
>> If I were to use a keyed stream with a count-based window of 2, would
>> Flink keep the last state persistently until the next state is
>> received? Would this be another way of having Flink keep this
>> information persistently without having to implement it manually?
>>
>> Thanks,
>> Raman
>>
>> On 18/01/17 11:22 AM, Fabian Hueske wrote:
>> > Hi Raman,
>> >
>> > I would approach this issues as follows.
>> >
>> > You key the input stream on the sourceId and apply a stateful
>> > FlatMapFunction.
>> > The FlatMapFunction has a key-partioned state and stores for each key
>> > (sourceId) the latest event as state.
>> > When a new event arrives, you can compute the time spend in the last
>> > state by looking up the event from the state and the latest received
>> > event.
>> > Then you put the new event in the state.
>> >
>> > This solution works well if you have a finite number of sources or if
>> > you have an terminal event that signals that no more events will
>> > arrive for a key.
>> > Otherwise, the number of events stored in the state will grow
>> > infinitely and eventually become a problem.
>> >
>> > If the  number of sources increases, you need to evict data at some
>> > point in time. A ProcessFunction can help here, because you can
>> > register a timer which
>> > you can use to evict up old state.
>> >
>> > Hope this helps,
>> > Fabian
>> >
>> > 2017-01-18 15:39 GMT+01:00 Raman Gupta <rocketraman@gmail.com
>> > <mailto:rocketraman@gmail.com>>:
>> >
>> >     I am investigating Flink. I am considering a relatively simple use
>> >     case -- I want to ingest streams of events that are essentially
>> >     timestamped state changes. These events may look something like:
>> >
>> >     {
>> >       sourceId: 111,
>> >       state: OPEN,
>> >       timestamp: <date/time>
>> >     }
>> >
>> >     I want to apply various processing to these state change events, the
>> >     output of which can be used for analytics. For example:
>> >
>> >     1. average time spent in state, by state
>> >     2. sources with longest (or shortest) time spent in OPEN state
>> >
>> >     The time spent in each state may be days or even weeks.
>> >
>> >     All the examples I have seen of similar logic involve windows on the
>> >     order of 15 minutes. Since time spent in each state may far exceed
>> >     these window sizes, I'm wondering what the best approach will be.
>> >
>> >     One thought from reading the docs is to use `every` to operate on
>> the
>> >     entire stream. But it seems like this will take longer and longer to
>> >     run as the event stream grows, so this is not an ideal solution. Or
>> >     does Flink apply some clever optimizations to avoid the potential
>> >     performance issue?
>> >
>> >     Another thought was to split the event stream into multiple streams
>> by
>> >     source, each of which will have a small (and limited) amount of
>> data.
>> >     This will make processing each stream simpler, but since there can
>> be
>> >     thousands of sources, it will result in a lot of streams to handle
>> and
>> >     persist (probably in Kafka). This does not seem ideal either.
>> >
>> >     It seems like this should be simple, but I'm struggling with
>> >     understanding how to solve it elegantly.
>> >
>> >     Regards,
>> >     Raman
>> >
>> >
>>
>
>
>

Mime
View raw message