Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CB802200B31 for ; Tue, 24 May 2016 14:34:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CA0FE160A35; Tue, 24 May 2016 12:34:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1800B160A34 for ; Tue, 24 May 2016 14:34:37 +0200 (CEST) Received: (qmail 60207 invoked by uid 500); 24 May 2016 12:34:37 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 60198 invoked by uid 99); 24 May 2016 12:34:37 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 12:34:37 +0000 Received: from mail-vk0-f54.google.com (mail-vk0-f54.google.com [209.85.213.54]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id DA1751A0015 for ; Tue, 24 May 2016 12:34:36 +0000 (UTC) Received: by mail-vk0-f54.google.com with SMTP id y2so19165443vka.3 for ; Tue, 24 May 2016 05:34:36 -0700 (PDT) X-Gm-Message-State: ALyK8tKpvEFapxy6qtU2WisNNDP0LKtexrF2cTDvE/nj0iucPs0137s8nxjCL6tx4EuFqZGuPixiwlzIt4OhZ3O+ X-Received: by 10.159.37.200 with SMTP id 66mr2309488uaf.113.1464093276091; Tue, 24 May 2016 05:34:36 -0700 (PDT) MIME-Version: 1.0 Received: by 10.176.67.227 with HTTP; Tue, 24 May 2016 05:34:16 -0700 (PDT) In-Reply-To: References: From: Maximilian Michels Date: Tue, 24 May 2016 14:34:16 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: Combining streams with static data and using REST API as a sink To: "user@flink.apache.org" Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable archived-at: Tue, 24 May 2016 12:34:39 -0000 Hi Josh, You can trigger an occasional refresh, e.g. on every 100 elements received. Or, you could start a thread that does that every 100 seconds (possible with a lock involved to prevent processing in the meantime). Cheers, Max On Mon, May 23, 2016 at 7:36 PM, Josh wrote: > > Hi Max, > > Thanks, that's very helpful re the REST API sink. For now I don't need ex= actly once guarantees for the sink, so I'll just write a simple HTTP sink i= mplementation. But may need to move to the idempotent version in future! > > For 1), that sounds like a simple/easy solution, but how would I handle o= ccasional updates in that case, since I guess the open() function is only c= alled once? Do I need to periodically restart the job, or periodically trig= ger tasks to restart and refresh their data? Ideally I would want this job = to be running constantly. > > Josh > > On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels wrot= e: >> >> Hi Josh, >> >> 1) Use a RichFunction which has an `open()` method to load data (e.g. fr= om a database) at runtime before the processing starts. >> >> 2) No that's fine. If you want your Rest API Sink to interplay with chec= kpointing (for fault-tolerance), this is a bit tricky though depending on t= he guarantees you want to have. Typically, you would have "at least once" o= r "exactly once" semantics on the state. In Flink, this is easy to achieve,= it's a bit harder for outside systems. >> >> "At Least Once" >> >> For example, if you increment a counter in a database, this count will b= e off if you recover your job in the case of a failure. You can checkpoint = the current value of the counter and restore this value on a failure (using= the Checkpointed interface). However, your counter might decrease temporar= ily when you resume from a checkpoint (until the counter has caught up agai= n). >> >> "Exactly Once" >> >> If you want "exactly once" semantics on outside systems (e.g. Rest API),= you'll need idempotent updates. An idempotent variant of this would be a c= ount with a checkpoint id (cid) in your database. >> >> | cid | count | >> |-----+-------| >> | 0 | 3 | >> | 1 | 11 | >> | 2 | 20 | >> | 3 | 120 | >> | 4 | 137 | >> | 5 | 158 | >> >> You would then always read the newest cid value for presentation. You wo= uld only write to the database once you know you have completed the checkpo= int (CheckpointListener). You can still fail while doing that, so you need = to keep the confirmation around in the checkpoint such that you can confirm= again after restore. It is important that confirmation can be done multipl= e times without affecting the result (idempotent). On recovery from a check= point, you want to delete all rows higher with a cid higher than the one yo= u resume from. For example, if you fail after checkpoint 3 has been created= , you'll confirm 3 (because you might have failed before you could confirm)= and then delete 4 and 5 before starting the computation again. >> >> You see, that strong consistency guarantees can be a bit tricky. If you = don't need strong guarantees and undercounting is ok for you, implement a s= imple checkpointing for "at least once" using the Checkpointed interface or= the KeyValue state if your counter is scoped by key. >> >> Cheers, >> Max >> >> >> On Mon, May 23, 2016 at 3:22 PM, Josh wrote: >> > Hi all, >> > >> > I am new to Flink and have a couple of questions which I've had troubl= e >> > finding answers to online. Any advice would be much appreciated! >> > >> > What's a typical way of handling the scenario where you want to join >> > streaming data with a (relatively) static data source? For example, if= I >> > have a stream 'orders' where each order has an 'item_id', and I want t= o join >> > this stream with my database of 'items'. The database of items is most= ly >> > static (with perhaps a few new items added every day). The database ca= n be >> > retrieved either directly from a standard SQL database (postgres) or v= ia a >> > REST call. I guess one way to handle this would be to distribute the >> > database of items with the Flink tasks, and to redeploy the entire job= if >> > the items database changes. But I think there's probably a better way = to do >> > it? >> > I'd like my Flink job to output state to a REST API. (i.e. using the R= EST >> > API as a sink). Updates would be incremental, e.g. the job would outpu= t >> > tumbling window counts which need to be added to some property on a RE= ST >> > resource, so I'd probably implement this as a PATCH. I haven't found m= uch >> > evidence that anyone else has used a REST API as a Flink sink - is the= re a >> > reason why this might be a bad idea? >> > >> > Thanks for any advice on these, >> > >> > Josh >> >