Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00CD72009F9 for ; Mon, 23 May 2016 19:36:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F39C4160A24; Mon, 23 May 2016 17:36:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 02CC6160A05 for ; Mon, 23 May 2016 19:36:31 +0200 (CEST) Received: (qmail 64050 invoked by uid 500); 23 May 2016 17:36:31 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 64040 invoked by uid 99); 23 May 2016 17:36:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 May 2016 17:36:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A2FA91A5A63 for ; Mon, 23 May 2016 17:36:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.429 X-Spam-Level: * X-Spam-Status: No, score=1.429 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id CY--KRHKStCY for ; Mon, 23 May 2016 17:36:28 +0000 (UTC) Received: from mail-vk0-f41.google.com (mail-vk0-f41.google.com [209.85.213.41]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id B367F5F471 for ; Mon, 23 May 2016 17:36:28 +0000 (UTC) Received: by mail-vk0-f41.google.com with SMTP id r140so54665905vkf.0 for ; Mon, 23 May 2016 10:36:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=/MHeeM+5UjATbiLr78yeMmMW5h60YVe4/gBgnqR/2uc=; b=p9SaHJ71qWHGUuL+uc6TEoRWL30pFBLuPTAIBGk78UALs5ehnSP1Hu9sTZPprh3YXi ymqpV7hhucgrhFBsKW96Z12J42vhFSlZY2CgousT2HcMGY+OxxiKDPSqJP3jlQN6tTbp WxA855DziajeDW6Dq2e9LSGimjsMl8/9V/A7cNKoHnM+cxMG7K8FEO7zdtgBGBJtdK69 FoTsBn51SFfTtzJcfJHb1evHjZ04MelDBOZdPPPFP0PnU2F1VTaFqEJyGioFPoGbLUlA hgx/k0PInw2icYpMaLadcerIvKaPR2wrjBoUVOFqFgigEIgDRvLx3Jg6GjaWEnaZAGvm USQA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=/MHeeM+5UjATbiLr78yeMmMW5h60YVe4/gBgnqR/2uc=; b=L6kwNEf9RWoKVUJB05+5K9+u7wzcyT1NFFEsAwHIjZcvAu3VqcQ8lD+QF3rEK3TvXX ZhVcRWVrlqoQRj7U/UaYeqaSTiv/uhVdJYj7qVCFbP6fmZ5RFN3VQ2lZ+0mU8tJep+JL qqpH/c+EY90oJUadI9X64aKRgZjZjMAgDasAg1SGu0eC4XDj5KUzylibcNHvBPpPFbGK QVyxSefc2ODDMgrrwD04BqoSCyMPyqPfNo071AcdoY3pOIwk+jyNsgNQbvyRB6eiHVLo fs94gxeHsNFNFfGFiEwJS7KvUUmyBfB7juCmrTe03gSVzujjNZSgFW78W5dMHdr6vdji cDKA== X-Gm-Message-State: ALyK8tJ+k3V8TntAjL89fkcUhN5dgzIVwmZgTv/z6E/mDsi+MWoME7M6ryAnGPVbl3uyCe3apAdLi61oKnUTTA== MIME-Version: 1.0 X-Received: by 10.31.60.142 with SMTP id j136mr42425vka.16.1464024987947; Mon, 23 May 2016 10:36:27 -0700 (PDT) Received: by 10.31.142.84 with HTTP; Mon, 23 May 2016 10:36:27 -0700 (PDT) In-Reply-To: References: Date: Mon, 23 May 2016 18:36:27 +0100 Message-ID: Subject: Re: Combining streams with static data and using REST API as a sink From: Josh To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11439356595bd9053385e1ab archived-at: Mon, 23 May 2016 17:36:33 -0000 --001a11439356595bd9053385e1ab Content-Type: text/plain; charset=UTF-8 Hi Max, Thanks, that's very helpful re the REST API sink. For now I don't need exactly once guarantees for the sink, so I'll just write a simple HTTP sink implementation. But may need to move to the idempotent version in future! For 1), that sounds like a simple/easy solution, but how would I handle occasional updates in that case, since I guess the open() function is only called once? Do I need to periodically restart the job, or periodically trigger tasks to restart and refresh their data? Ideally I would want this job to be running constantly. Josh On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels wrote: > Hi Josh, > > 1) Use a RichFunction which has an `open()` method to load data (e.g. from > a database) at runtime before the processing starts. > > 2) No that's fine. If you want your Rest API Sink to interplay with > checkpointing (for fault-tolerance), this is a bit tricky though depending > on the guarantees you want to have. Typically, you would have "at least > once" or "exactly once" semantics on the state. In Flink, this is easy to > achieve, it's a bit harder for outside systems. > > "At Least Once" > > For example, if you increment a counter in a database, this count will be > off if you recover your job in the case of a failure. You can checkpoint > the current value of the counter and restore this value on a failure (using > the Checkpointed interface). However, your counter might decrease > temporarily when you resume from a checkpoint (until the counter has caught > up again). > > "Exactly Once" > > If you want "exactly once" semantics on outside systems (e.g. Rest API), > you'll need idempotent updates. An idempotent variant of this would be a > count with a checkpoint id (cid) in your database. > > | cid | count | > |-----+-------| > | 0 | 3 | > | 1 | 11 | > | 2 | 20 | > | 3 | 120 | > | 4 | 137 | > | 5 | 158 | > > You would then always read the newest cid value for presentation. You > would only write to the database once you know you have completed the > checkpoint (CheckpointListener). You can still fail while doing that, so > you need to keep the confirmation around in the checkpoint such that you > can confirm again after restore. It is important that confirmation can be > done multiple times without affecting the result (idempotent). On recovery > from a checkpoint, you want to delete all rows higher with a cid higher > than the one you resume from. For example, if you fail after checkpoint 3 > has been created, you'll confirm 3 (because you might have failed before > you could confirm) and then delete 4 and 5 before starting the computation > again. > > You see, that strong consistency guarantees can be a bit tricky. If you > don't need strong guarantees and undercounting is ok for you, implement a > simple checkpointing for "at least once" using the Checkpointed interface > or the KeyValue state if your counter is scoped by key. > > Cheers, > Max > > > On Mon, May 23, 2016 at 3:22 PM, Josh wrote: > > Hi all, > > > > I am new to Flink and have a couple of questions which I've had trouble > > finding answers to online. Any advice would be much appreciated! > > > > What's a typical way of handling the scenario where you want to join > > streaming data with a (relatively) static data source? For example, if I > > have a stream 'orders' where each order has an 'item_id', and I want to > join > > this stream with my database of 'items'. The database of items is mostly > > static (with perhaps a few new items added every day). The database can > be > > retrieved either directly from a standard SQL database (postgres) or via > a > > REST call. I guess one way to handle this would be to distribute the > > database of items with the Flink tasks, and to redeploy the entire job if > > the items database changes. But I think there's probably a better way to > do > > it? > > I'd like my Flink job to output state to a REST API. (i.e. using the REST > > API as a sink). Updates would be incremental, e.g. the job would output > > tumbling window counts which need to be added to some property on a REST > > resource, so I'd probably implement this as a PATCH. I haven't found much > > evidence that anyone else has used a REST API as a Flink sink - is there > a > > reason why this might be a bad idea? > > > > Thanks for any advice on these, > > > > Josh > > --001a11439356595bd9053385e1ab Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Max,

Thanks, that's very helpful= re the REST API sink. For now I don't need exactly once guarantees for= the sink, so I'll just write a simple HTTP sink implementation. But ma= y need to move to the idempotent version in future!

For 1), that sounds like a simple/easy solution, but how would I handle o= ccasional updates in that case, since I guess the open() function is only c= alled once? Do I need to periodically restart the job, or periodically trig= ger tasks to restart and refresh their data? Ideally I would want this job = to be running constantly.

Josh

On Mon, May 23, 2016 at 5= :56 PM, Maximilian Michels <mxm@apache.org> wrote:
Hi Josh,

1) Use a RichFu= nction which has an `open()` method to load data (e.g. from a database) at = runtime before the processing starts.

2) No that's fine. If you = want your Rest API Sink to interplay with checkpointing (for fault-toleranc= e), this is a bit tricky though depending on the guarantees you want to hav= e. Typically, you would have "at least once" or "exactly onc= e" semantics on the state. In Flink, this is easy to achieve, it's= a bit harder for outside systems.

"At Least Once&qu= ot;

For example, if you increment a counter in a database= , this count=20 will be off if you recover your job in the case of a failure. You can check= point the current value of the counter and restore this value on a failure = (using the Checkpointed interface). However, your counter might decrease te= mporarily when you resume from a checkpoint (until the counter has caught u= p again).

"Exactly Once"

If you want &qu= ot;exactly once" semantics on outside systems (e.g. Rest API), you'= ;ll need idempotent updates. An=20 idempotent variant of this would be a count with a checkpoint id (cid) in y= our database.

| cid = | count |
|-----+-------|
| =C2=A0 0 | =C2=A0 =C2=A0 3 |
| =C2=A0 = 1 | =C2=A0=C2=A0 11 |
| =C2=A0 2 | =C2=A0=C2=A0 20 |
| =C2=A0 3 | =C2= =A0 120 |
| =C2=A0 4 | =C2=A0 137 |
| =C2=A0 5 | =C2=A0 158 |
<= br>
You would then always read the newest cid value for presentati= on. You would only write to the database once you know you have completed t= he checkpoint=C2=A0(CheckpointListener). You can still fail while doing tha= t, so you need to keep the confirmation around in the checkpoint such that = you can confirm again after restore. It is important that confirmation can = be done multiple times without affecting the result (idempotent). On recove= ry from a checkpoint, you want to delete all rows higher with a cid higher = than the one you resume from. For example, if you fail after checkpoint 3 h= as been created, you'll confirm 3 (because you might have failed before= you could confirm) and then delete 4 and 5 before starting the computation= again.

You see, that strong consistency guarantees can be a bit tri= cky. If you don't need strong guarantees and undercounting is ok for yo= u, implement a simple checkpointing for "at least once" using the= Checkpointed interface or the KeyValue state if your counter is scoped by = key.

Cheers,
Max


O= n Mon, May 23, 2016 at 3:22 PM, Josh <jofo90@gmail.com> wrote:
> Hi all,
>=
> I am new to Flink and have a couple of questions which I've ha= d trouble
> finding answers to online. Any advice would be much appre= ciated!
>
> What's a typical way of handling the scenario w= here you want to join
> streaming data with a (relatively) static dat= a source? For example, if I
> have a stream 'orders' where ea= ch order has an 'item_id', and I want to join
> this stream w= ith my database of 'items'. The database of items is mostly
>= static (with perhaps a few new items added every day). The database can be=
> retrieved either directly from a standard SQL database (postgres) = or via a
> REST call. I guess one way to handle this would be to dist= ribute the
> database of items with the Flink tasks, and to redeploy = the entire job if
> the items database changes. But I think there'= ;s probably a better way to do
> it?
> I'd like my Flink jo= b to output state to a REST API. (i.e. using the REST
> API as a sink= ). Updates would be incremental, e.g. the job would output
> tumbling= window counts which need to be added to some property on a REST
> re= source, so I'd probably implement this as a PATCH. I haven't found = much
> evidence that anyone else has used a REST API as a Flink sink = - is there a
> reason why this might be a bad idea?
>
> T= hanks for any advice on these,
>
> Josh


--001a11439356595bd9053385e1ab--