flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shannon Carey <sca...@expedia.com>
Subject Re: State in external db (dynamodb)
Date Thu, 07 Apr 2016 11:12:15 GMT
Thanks very kindly for your response, Stephan!

We will definitely use a custom sink for persistence of idempotent mutations whenever possible.
Exposing state as read-only to external systems is a complication we will try to avoid. Also,
we will definitely only write to the DB upon checkpoint, and the write will be synchronous
and transactional (no possibility of partial success/failure).

However, we do want Flink state to be durable, we want it to be in memory when possible, and
we want to avoid running out of memory due to the size of the state. For example, if you have
a wide window that hasn't gotten an event for a long time, we want to evict that window state
from memory. We're now thinking of using Redis (via AWS Elasticache) which also conveniently
has TTL, instead of DynamoDB.

I just wanted to check whether eviction of (inactive/quiet) state from memory is something
that I should consider implementing, or whether Flink already had some built-in way of doing

Along the same lines, I am also wondering whether Flink already has means of compacting the
state of a window by applying an aggregation function to the elements so-far (eg. every time
window is triggered)? For example, if you are only executing a sum on the contents of the
window, the window state doesn't need to store all the individual items in the window, it
only needs to store the sum. Aggregations other than "sum" might have that characteristic
too. I don't know if Flink is already that intelligent or whether I should figure out how
to aggregate window contents myself when possible with something like a window fold? Another
poster (Aljoscha) was talking about adding incremental snapshots, but it sounds like that
would only improve the write throughput not the memory usage.

Thanks again!
Shannon Carey

From: Stephan Ewen <sewen@apache.org<mailto:sewen@apache.org>>
Date: Wednesday, April 6, 2016 at 10:37 PM
To: <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: State in external db (dynamodb)

Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want "exactly-once" semantics,
because there can be a replay of elements that were already written.

However, what you describe later, overwriting of a key with a new value (or the same value
again) is pretty much sufficient. That means that when a duplicate write happens during replay,
the value for the key is simply overwritten with the same value again.
As long as all computation is purely in Flink and you only write to the key/value store (rather
than read from k/v, modify in Flink, write to k/v), you get the consistency that for example
counts/aggregates never have duplicates.

If Flink needs to look up state from the database (because it is no longer in Flink), it is
a bit more tricky. I assume that is where you are going with "Subsequently, when an event
is processed, we must be able to quickly load up any evicted state".  In that case, there
are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known that no replay of
that data will occur any more. Values from partially successful writes will be overwritten
with correct value. I assume that is what you thought of when referring to the State Backend,
because in some sense, that is what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a new state backend.
 Another Flink committer (Chesnay) has developed some nice tooling for that, to be merged
into Flink soon.

(2) You could attach version numbers to every write, and increment the versions upon each
checkpoint. That allows you to always refer to a consistent previous value, if some writes
were made, but a failure occurred before the checkpoint completed.

I hope these answers apply to your case. Let us know if some things are still unclear, or
if I misunderstood your question!


On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sanne.de.roever@gmail.com<mailto:sanne.de.roever@gmail.com>>
FYI Cassandra has a TTL on data: https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html

On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <scarey@expedia.com<mailto:scarey@expedia.com>>
Hi, new Flink user here!

I found a discussion on user@flink.apache.org<mailto:user@flink.apache.org> about using
DynamoDB as a sink. However, as noted, sinks have an at-least-once guarantee so your operations
must idempotent.

However, another way to go about this (and correct me if I'm wrong) is to write the state
to the external store via a custom State Backend. Since the state participates in checkpointing,
you don't have to worry about idempotency: every time state is checkpointed, overwrite the
value of that key.

We are starting a project with Flink, and we are interested in evicting the state from memory
once a TTL is reached during which no events have come in for that state. Subsequently, when
an event is processed, we must be able to quickly load up any evicted state. Does this sound
reasonable? We are considering using DynamoDB for our state backend because it seems like
all we will need is a key-value store. The only weakness of this is that if state gets older
than, say, 2 years we would like to get rid of it which might not be easy in DynamoDB. I don't
suppose Flink has any behind-the-scenes features that deal with getting rid of old state (either
evicting from memory or TTL/aging out entirely)?

Thanks for your time!
Shannon Carey

View raw message