Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A23F9200C79 for ; Fri, 19 May 2017 11:09:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A0CC8160BD1; Fri, 19 May 2017 09:09:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF618160BBE for ; Fri, 19 May 2017 11:09:32 +0200 (CEST) Received: (qmail 24971 invoked by uid 500); 19 May 2017 09:09:31 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 24954 invoked by uid 99); 19 May 2017 09:09:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 May 2017 09:09:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 14E341A0E24 for ; Fri, 19 May 2017 09:09:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.379 X-Spam-Level: *** X-Spam-Status: No, score=3.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id UJrwnh9sdKt9 for ; Fri, 19 May 2017 09:09:29 +0000 (UTC) Received: from mail-yw0-f176.google.com (mail-yw0-f176.google.com [209.85.161.176]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id B85AC5F365 for ; Fri, 19 May 2017 09:09:28 +0000 (UTC) Received: by mail-yw0-f176.google.com with SMTP id p73so21556390ywp.0 for ; Fri, 19 May 2017 02:09:28 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=b2rLgxLc4SM0Vm3Rp3twZkQWJs3QCVOoeB7ztaG8Jc0=; b=oHAsinXsVWYvSWwofkKKmoxmrAk1rObmuzhkzVjC6CqXIRU3iCUMPfsUMK8CvpWzF5 OX5MFIp8M6okQpuVZ+AgCS/+tZoAyegEx6IFZo4RS2FPKiGqvbMqbzD9ZbiTDWtTJ9f5 PEc/d+l9/9kqcvtZfi3aqxSGQFEjplLn9LKTPuynTyzYiNAN3aKuZCqrCJVqWWh56GlH WYFULy9S2bAfwXgvDno3xS4h8GMGs9z7zNi6js6nqA8Ga/WcGxViFiVuVbJPNncIwPo8 3haqOvu9tmCjHOz5cXOKbgOcpH1hSLXhhAagPzPSWXmsV7b9PelRU2XxOS3kJBFwBAp+ VjKA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=b2rLgxLc4SM0Vm3Rp3twZkQWJs3QCVOoeB7ztaG8Jc0=; b=L4ieet3Oq7uhhFSgpKtxHoVZ/IhpEXmCrc+sp1/907YN2V4MfTdJD5W362ntOc350H QPwgxcjBq0qCrtKqgsaugu5w+rUzV1QF6DpIVJNSRMIcts/WrXcqnUW4R1lSTjsYr90j RuDrjvmMp3hR7iWFK5/MZNboqGOHaE1gfs9mtl+vv1k7nFLQZ2wdBOU/DcZbnvNlHD4T pAE77fWFHVHss/oNpMW1MPCPoYSAFvKY+B+JV768sU84g8mMTnX1WaKj2EWp+ASQ6FLa 60Ey1MZCKYRa4ME72+aspn8DcyMtDj0F7dprxIFd1EDqswZvPckrP5lKXd3F0d9JOxxW 9jPA== X-Gm-Message-State: AODbwcDkPb5sK+oZCugNH7m4YBnD6xwNLjBo4Qq4oV5ZgrYxthDJ4Eid s0Ad+P+cQSvhk4TorVF4se/JVlaj/myz X-Received: by 10.13.223.6 with SMTP id i6mr7060089ywe.250.1495184967638; Fri, 19 May 2017 02:09:27 -0700 (PDT) MIME-Version: 1.0 Received: by 10.83.41.4 with HTTP; Fri, 19 May 2017 02:08:57 -0700 (PDT) In-Reply-To: References: From: Fabian Hueske Date: Fri, 19 May 2017 11:08:57 +0200 Message-ID: Subject: Re: Best practices to maintain reference data for Flink Jobs To: "Tzu-Li (Gordon) Tai" Cc: user Content-Type: multipart/alternative; boundary="001a114e4b38dedd32054fdce0c2" archived-at: Fri, 19 May 2017 09:09:33 -0000 --001a114e4b38dedd32054fdce0c2 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable +1 to what Gordon said. Queryable state is rather meant as an external interface to streaming jobs than for lookups within jobs. Accessing co-located state should give you better performance and is probably easier to implement and maintain. Cheers, Fabian 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai : > Hi, > > Can the enriching data be keyed? Or is it something that has to be > broadcasted to each operator? > Either way, I think Side Inputs (an upcoming feature in the future) is th= e > best fit for this. You can take a look at https://issues.apache.org/ > jira/browse/FLINK-6131. > > Regarding the 3 options you listed: > > By using QueryableState in option B, what you mean is that you want to > feed the enriching data stream to a separate job, let that job allow > queryable state, and query that state from the actual application job > operators, correct? If so, I think options A and B would mean the same > thing; i.e., they require accessing data external to the job. > > If the enriching data can somehow be keyed with the stream that requires > it, I would go for option C using connected streams, with the enriching > data as one input and the actual data as the other. Instead of just > =E2=80=9Ccaching the enriching data in memory=E2=80=9D, you should regist= er it as a managed > Link state for the CoMapFunction / CoFlatMapFunction. The actual input > stream records can just access that registered state locally. > > Cheers, > Gordon > > > On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote: > > Hi. Say I have a few reference data sets need to be used for a > streaming job. The sizes range between 10M-10GB. The data is not > static, will be refreshed at minutes and/or day intervals. > > With the new advancements in Flink, it seems there are quite a few > options. > A. Store all the data in an external (kv) database cluster. And use > async io calls > * data refresh can be done in a few different ways > B. Use the new Querytable State feature > * it seems there is no "easy" API to discover the > queryable state at the moment. Need to use the restful API to figure > out the job id. > C. Ingest the reference data into the job and cache them in memory > Any other option? > > On paper, it seems option B with the Queryable State is the cleanest > solution. > > Any comment/suggestion is greatly appreciated in particular in terms > of robustness and consistent recovery. > > Thanks much! > > --001a114e4b38dedd32054fdce0c2 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
+1 to what Gordon said.

Q= ueryable state is rather meant as an external interface to streaming jobs t= han for lookups within jobs.
Accessing co-located state should giv= e you better performance and is probably easier to implement and maintain.<= br>
Cheers,
Fabian
2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai = <tzulitai@apache.org>:
<= div style=3D"word-wrap:break-word">
Hi,

Can the enri= ching data be keyed? Or is it something that has to be broadcasted to each = operator?
Either way, I think Side Inputs (an upcoming feature in t= he future) is the best fit for this. You can take a look at=C2=A0https:= //issues.apache.org/jira/browse/FLINK-6131.

R= egarding the 3 options you listed:

By using QueryableS= tate in option B, what you mean is that you want to feed the enriching data= stream to a separate job, let that job allow queryable state, and query th= at state from the actual application job operators, correct? If so, I think= options A and B would mean the same thing; i.e., they require accessing da= ta external to the job.=C2=A0

If the enriching data ca= n somehow be keyed with the stream that requires it, I would go for option = C using connected streams, with the enriching data as one input and the act= ual data as the other. Instead of just =E2=80=9Ccaching the enriching data = in memory=E2=80=9D, you should register it as a managed Link state for the = CoMapFunction / CoFlatMapFunction. The actual input stream records can just= access that registered state locally.

Cheers,
Gor= don


On 19 May 2017 at 7:11:07 AM, Sa= nd Stone (sand.= m.stone@gmail.com) wrote:

Hi. Say I have a few re= ference data sets need to be used for a
streaming job. The sizes range between 10M-10GB. The data is not
static, will be refreshed at minutes and/or day intervals.

With the new advancements in Flink, it seems there are quite a few opti= ons.
A. Store all the data in an external (kv) database cluster. And use
async io calls
* data refresh can be done in a few different ways
B. Use the new Querytable State feature
* it seems there is no "easy" API to discover the
queryable state at the moment. Need to use the restful API to figure
out the job id.
C. Ingest the reference data into the job and cache them in memory
Any other option?

On paper, it seems option B with the Queryable State is the cleanest so= lution.

Any comment/suggestion is greatly appreciated in particular in terms
of robustness and consistent recovery.

Thanks much!

--001a114e4b38dedd32054fdce0c2--