flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Combining streams with static data and using REST API as a sink
Date Mon, 23 May 2016 16:56:34 GMT
Hi Josh,

1) Use a RichFunction which has an `open()` method to load data (e.g. from
a database) at runtime before the processing starts.

2) No that's fine. If you want your Rest API Sink to interplay with
checkpointing (for fault-tolerance), this is a bit tricky though depending
on the guarantees you want to have. Typically, you would have "at least
once" or "exactly once" semantics on the state. In Flink, this is easy to
achieve, it's a bit harder for outside systems.

"At Least Once"

For example, if you increment a counter in a database, this count will be
off if you recover your job in the case of a failure. You can checkpoint
the current value of the counter and restore this value on a failure (using
the Checkpointed interface). However, your counter might decrease
temporarily when you resume from a checkpoint (until the counter has caught
up again).

"Exactly Once"

If you want "exactly once" semantics on outside systems (e.g. Rest API),
you'll need idempotent updates. An idempotent variant of this would be a
count with a checkpoint id (cid) in your database.

| cid | count |
|-----+-------|
|   0 |     3 |
|   1 |    11 |
|   2 |    20 |
|   3 |   120 |
|   4 |   137 |
|   5 |   158 |

You would then always read the newest cid value for presentation. You would
only write to the database once you know you have completed the
checkpoint (CheckpointListener). You can still fail while doing that, so
you need to keep the confirmation around in the checkpoint such that you
can confirm again after restore. It is important that confirmation can be
done multiple times without affecting the result (idempotent). On recovery
from a checkpoint, you want to delete all rows higher with a cid higher
than the one you resume from. For example, if you fail after checkpoint 3
has been created, you'll confirm 3 (because you might have failed before
you could confirm) and then delete 4 and 5 before starting the computation
again.

You see, that strong consistency guarantees can be a bit tricky. If you
don't need strong guarantees and undercounting is ok for you, implement a
simple checkpointing for "at least once" using the Checkpointed interface
or the KeyValue state if your counter is scoped by key.

Cheers,
Max

On Mon, May 23, 2016 at 3:22 PM, Josh <jofo90@gmail.com> wrote:
> Hi all,
>
> I am new to Flink and have a couple of questions which I've had trouble
> finding answers to online. Any advice would be much appreciated!
>
> What's a typical way of handling the scenario where you want to join
> streaming data with a (relatively) static data source? For example, if I
> have a stream 'orders' where each order has an 'item_id', and I want to
join
> this stream with my database of 'items'. The database of items is mostly
> static (with perhaps a few new items added every day). The database can be
> retrieved either directly from a standard SQL database (postgres) or via a
> REST call. I guess one way to handle this would be to distribute the
> database of items with the Flink tasks, and to redeploy the entire job if
> the items database changes. But I think there's probably a better way to
do
> it?
> I'd like my Flink job to output state to a REST API. (i.e. using the REST
> API as a sink). Updates would be incremental, e.g. the job would output
> tumbling window counts which need to be added to some property on a REST
> resource, so I'd probably implement this as a PATCH. I haven't found much
> evidence that anyone else has used a REST API as a Flink sink - is there a
> reason why this might be a bad idea?
>
> Thanks for any advice on these,
>
> Josh

Mime
View raw message