Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 84D1A200C79 for ; Fri, 19 May 2017 19:03:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 835E6160BD1; Fri, 19 May 2017 17:03:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A41AB160BB0 for ; Fri, 19 May 2017 19:03:27 +0200 (CEST) Received: (qmail 67554 invoked by uid 500); 19 May 2017 17:03:26 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 67543 invoked by uid 99); 19 May 2017 17:03:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 May 2017 17:03:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4D3A0C2732 for ; Fri, 19 May 2017 17:03:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.379 X-Spam-Level: X-Spam-Status: No, score=0.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6WZWKSUKj8dJ for ; Fri, 19 May 2017 17:03:25 +0000 (UTC) Received: from mail-vk0-f54.google.com (mail-vk0-f54.google.com [209.85.213.54]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id C077A5F365 for ; Fri, 19 May 2017 17:03:24 +0000 (UTC) Received: by mail-vk0-f54.google.com with SMTP id p85so15129211vkd.3 for ; Fri, 19 May 2017 10:03:24 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc:content-transfer-encoding; bh=b50+tnOroPizybaIpM9VY75hK0ipoZsBD/K75jRBx68=; b=trNssjm/Wm46eJZCYhi+u8Ino53F/SR6tcMgnJqsgn9qoRogd98fCF1i27NxPv1fxY 1xBC2OSUdmhdVpGqy7lxDJqeCPF5zqmEPLCKsZv8tpjkVvTZYSJJqvegSM3XO+DtnDMX nGMGy0vYBpm4I8aBT3mBtKix8RHlGVr0p5g5PUZf5Q3S1mQYZj8Y2DiElcgh4DzuV0aV GXWkBFmY13KPMI40gz11idcqoCI528nr5+hQMBsSCpRRJvFHeggj+LasF0A0W2j6Ciy9 bUH3wYQWU2rrlw5ExgRoCPBaokt5qE0rpVmHlS8GfH1oiBPVZwEKOsZadqPAgDCoQyd0 40Jw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc:content-transfer-encoding; bh=b50+tnOroPizybaIpM9VY75hK0ipoZsBD/K75jRBx68=; b=Dx5oqYYGw1hwyl0ZOqsHVgB6y8BuaAFTc60F/58CvR9GXEUh56IZjRLcrnaA5PZ/qj Jciahtbkyocfw2l8oGG1zeJaCDRV2JklfKou97vJFusdQJON192hl6HIhz+Bfn3PPK94 9QM07UBN8gO7SdnUX2csv/34+SreAuToxJu744lsqLbxZqq0ZpVUGnmlD6gxZ8W2C6WJ 2xkxFF33+9EC6UpqdOEGhLCZn5UfRxhh9ijU5nISrvv6gEggbuGG5yFaN0IIt2B/3Nji obPHOVl8gfz1HVADLdS05C4IoZj4pOQ6EsWD5Bi6OFrAI3VnX2/Rg/4bWgDO+CncGslh TO+Q== X-Gm-Message-State: AODbwcDS4n8UtMaeqGA/6R3o3SLRu1ylF6DMrBxxn1I8kYqPgiYqy3B3 OF/n9mqls7rSkVyBArMSPaLRapiaHg== X-Received: by 10.31.128.4 with SMTP id b4mr2334359vkd.88.1495213398161; Fri, 19 May 2017 10:03:18 -0700 (PDT) MIME-Version: 1.0 Received: by 10.176.76.198 with HTTP; Fri, 19 May 2017 10:03:17 -0700 (PDT) In-Reply-To: References: From: Sand Stone Date: Fri, 19 May 2017 10:03:17 -0700 Message-ID: Subject: Re: Best practices to maintain reference data for Flink Jobs To: Fabian Hueske Cc: "Tzu-Li (Gordon) Tai" , user Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable archived-at: Fri, 19 May 2017 17:03:28 -0000 Thanks Gordon and Fabian. The enriching data is really reference data, e.g. the reverseIP database. It's hard to be keyed as the main data stream as the "ip address" in the event is not a primary key in the main data stream. QueryableState is close, but it does not support range scan as far as I could tell. The remote datastore has a clean semantics: a logical single copy plus supports range scan, but the RPC to another cluster is not optimal. I assume this is a quite common streaming processing pattern for Flink based services. On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske wrote: > +1 to what Gordon said. > > Queryable state is rather meant as an external interface to streaming job= s > than for lookups within jobs. > Accessing co-located state should give you better performance and is > probably easier to implement and maintain. > > Cheers, > Fabian > > 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai : >> >> Hi, >> >> Can the enriching data be keyed? Or is it something that has to be >> broadcasted to each operator? >> Either way, I think Side Inputs (an upcoming feature in the future) is t= he >> best fit for this. You can take a look at >> https://issues.apache.org/jira/browse/FLINK-6131. >> >> Regarding the 3 options you listed: >> >> By using QueryableState in option B, what you mean is that you want to >> feed the enriching data stream to a separate job, let that job allow >> queryable state, and query that state from the actual application job >> operators, correct? If so, I think options A and B would mean the same >> thing; i.e., they require accessing data external to the job. >> >> If the enriching data can somehow be keyed with the stream that requires >> it, I would go for option C using connected streams, with the enriching = data >> as one input and the actual data as the other. Instead of just =E2=80=9C= caching the >> enriching data in memory=E2=80=9D, you should register it as a managed L= ink state >> for the CoMapFunction / CoFlatMapFunction. The actual input stream recor= ds >> can just access that registered state locally. >> >> Cheers, >> Gordon >> >> >> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote: >> >> Hi. Say I have a few reference data sets need to be used for a >> streaming job. The sizes range between 10M-10GB. The data is not >> static, will be refreshed at minutes and/or day intervals. >> >> With the new advancements in Flink, it seems there are quite a few >> options. >> A. Store all the data in an external (kv) database cluster. And use >> async io calls >> * data refresh can be done in a few different ways >> B. Use the new Querytable State feature >> * it seems there is no "easy" API to discover the >> queryable state at the moment. Need to use the restful API to figure >> out the job id. >> C. Ingest the reference data into the job and cache them in memory >> Any other option? >> >> On paper, it seems option B with the Queryable State is the cleanest >> solution. >> >> Any comment/suggestion is greatly appreciated in particular in terms >> of robustness and consistent recovery. >> >> Thanks much! > >