flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Combining streams with static data and using REST API as a sink
Date Tue, 24 May 2016 12:34:16 GMT
Hi Josh,

You can trigger an occasional refresh, e.g. on every 100 elements
received. Or, you could start a thread that does that every 100
seconds (possible with a lock involved to prevent processing in the


On Mon, May 23, 2016 at 7:36 PM, Josh <jofo90@gmail.com> wrote:
> Hi Max,
> Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees
for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to
the idempotent version in future!
> For 1), that sounds like a simple/easy solution, but how would I handle occasional updates
in that case, since I guess the open() function is only called once? Do I need to periodically
restart the job, or periodically trigger tasks to restart and refresh their data? Ideally
I would want this job to be running constantly.
> Josh
> On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <mxm@apache.org> wrote:
>> Hi Josh,
>> 1) Use a RichFunction which has an `open()` method to load data (e.g. from a database)
at runtime before the processing starts.
>> 2) No that's fine. If you want your Rest API Sink to interplay with checkpointing
(for fault-tolerance), this is a bit tricky though depending on the guarantees you want to
have. Typically, you would have "at least once" or "exactly once" semantics on the state.
In Flink, this is easy to achieve, it's a bit harder for outside systems.
>> "At Least Once"
>> For example, if you increment a counter in a database, this count will be off if
you recover your job in the case of a failure. You can checkpoint the current value of the
counter and restore this value on a failure (using the Checkpointed interface). However, your
counter might decrease temporarily when you resume from a checkpoint (until the counter has
caught up again).
>> "Exactly Once"
>> If you want "exactly once" semantics on outside systems (e.g. Rest API), you'll need
idempotent updates. An idempotent variant of this would be a count with a checkpoint id (cid)
in your database.
>> | cid | count |
>> |-----+-------|
>> |   0 |     3 |
>> |   1 |    11 |
>> |   2 |    20 |
>> |   3 |   120 |
>> |   4 |   137 |
>> |   5 |   158 |
>> You would then always read the newest cid value for presentation. You would only
write to the database once you know you have completed the checkpoint (CheckpointListener).
You can still fail while doing that, so you need to keep the confirmation around in the checkpoint
such that you can confirm again after restore. It is important that confirmation can be done
multiple times without affecting the result (idempotent). On recovery from a checkpoint, you
want to delete all rows higher with a cid higher than the one you resume from. For example,
if you fail after checkpoint 3 has been created, you'll confirm 3 (because you might have
failed before you could confirm) and then delete 4 and 5 before starting the computation again.
>> You see, that strong consistency guarantees can be a bit tricky. If you don't need
strong guarantees and undercounting is ok for you, implement a simple checkpointing for "at
least once" using the Checkpointed interface or the KeyValue state if your counter is scoped
by key.
>> Cheers,
>> Max
>> On Mon, May 23, 2016 at 3:22 PM, Josh <jofo90@gmail.com> wrote:
>> > Hi all,
>> >
>> > I am new to Flink and have a couple of questions which I've had trouble
>> > finding answers to online. Any advice would be much appreciated!
>> >
>> > What's a typical way of handling the scenario where you want to join
>> > streaming data with a (relatively) static data source? For example, if I
>> > have a stream 'orders' where each order has an 'item_id', and I want to join
>> > this stream with my database of 'items'. The database of items is mostly
>> > static (with perhaps a few new items added every day). The database can be
>> > retrieved either directly from a standard SQL database (postgres) or via a
>> > REST call. I guess one way to handle this would be to distribute the
>> > database of items with the Flink tasks, and to redeploy the entire job if
>> > the items database changes. But I think there's probably a better way to do
>> > it?
>> > I'd like my Flink job to output state to a REST API. (i.e. using the REST
>> > API as a sink). Updates would be incremental, e.g. the job would output
>> > tumbling window counts which need to be added to some property on a REST
>> > resource, so I'd probably implement this as a PATCH. I haven't found much
>> > evidence that anyone else has used a REST API as a Flink sink - is there a
>> > reason why this might be a bad idea?
>> >
>> > Thanks for any advice on these,
>> >
>> > Josh

View raw message