Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ED7F5200D15 for ; Thu, 5 Oct 2017 19:58:58 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EC9651609E2; Thu, 5 Oct 2017 17:58:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 182141609D2 for ; Thu, 5 Oct 2017 19:58:57 +0200 (CEST) Received: (qmail 47807 invoked by uid 500); 5 Oct 2017 17:58:57 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 47797 invoked by uid 99); 5 Oct 2017 17:58:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 17:58:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 3CBAE1808DA for ; Thu, 5 Oct 2017 17:58:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.378 X-Spam-Level: ** X-Spam-Status: No, score=2.378 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id oqL_Ppvf0pi2 for ; Thu, 5 Oct 2017 17:58:54 +0000 (UTC) Received: from mail-ua0-f180.google.com (mail-ua0-f180.google.com [209.85.217.180]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 9EE305FDEA for ; Thu, 5 Oct 2017 17:58:54 +0000 (UTC) Received: by mail-ua0-f180.google.com with SMTP id e46so3554289uaa.4 for ; Thu, 05 Oct 2017 10:58:54 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=2wZ0RHiLhItt1W1xPcAGo3op4DyI34YhsVS8rZDWjic=; b=WMJc87yf4vsBneWNhuxgpi8LmagTLJYtxw+L36HOCsDFQvYPfdQJSL0HgPPXHWBJGT QvBcJLR4r3fh3+Ylin4WyUe/JkazAqnyQ4xDudg5HecthpHX8rB+OjLKeeOFfsuVdqEK Woh/fN/BbcQDqfN3N56IDNHz9DpiW6plClvMliDCdAT/ycIvMiVX7UwEw5svRXzLh0FM BAQaYEwbQXm5VG4ZBTQzbyuWyLRZApASAPbfBTMb7g4LwZTbpmfjVDepglg1Q0IqmIYN vOQ++9iTjwAWVQM2+8hvOJ5riU1pIoIRoFOeH3BCyZQCkV5G4M64UITDPpcDl6MA3pK5 go+A== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=2wZ0RHiLhItt1W1xPcAGo3op4DyI34YhsVS8rZDWjic=; b=mQmlkOZFS1Wxpc2iS5x7QbVG2UYrDtQOK2GS64iUPfrrmJG9CigNYHTQUGAjjX5R/H WpzQiVFYB2b/qJ23OfKcoDbA/H/h8+eK1Yi4mH7BbYLaot20KZ3MwPg3+XJ9GEqWCiSK MSk8KLPlNKOMYQthY74gwre22uwRZe3+RyX1VtxeXvACEdSJeM68BcgzKSQ1oc/mhNk/ V+x4T9vXpFym7pfT2BWVTBxgRzqRFycvae3+fRX+0nFUZp+VCdGEgS/NODoUAvFW1wKZ uL+MnCIRCOy7WQcEGOFjtsRm8oB3R284sxM4QWwDb7BDzYcOr9C9vTR44/uBkL/1T/TF d5Rg== X-Gm-Message-State: AHPjjUgKgLS+CCCeSF89pOqKG9/qFBPTEZ+lVQLVzhiD/0PJbYkim1Xn h5yhPf65KV3BpbRdtjG2Zj7u0Suepa1bJ1we7lC3yrHO X-Google-Smtp-Source: AOwi7QBlih9Gh5bnpW/Wi9IaNkRGWuoH+VfRd9KlHp0Fq2oJKpclfVIj+5DsxnVrrY1IbfwQLV1iP96iTlt0rDMFXbg= X-Received: by 10.176.21.136 with SMTP id i8mr14533072uae.142.1507226333730; Thu, 05 Oct 2017 10:58:53 -0700 (PDT) MIME-Version: 1.0 Received: by 10.103.84.28 with HTTP; Thu, 5 Oct 2017 10:58:53 -0700 (PDT) In-Reply-To: References: From: Lukasz Cwik Date: Thu, 5 Oct 2017 10:58:53 -0700 Message-ID: Subject: Re: Best way to inject external per key config into the pipeline To: user@beam.apache.org Content-Type: multipart/alternative; boundary="001a1145311c38805a055ad07a2e" archived-at: Thu, 05 Oct 2017 17:58:59 -0000 --001a1145311c38805a055ad07a2e Content-Type: text/plain; charset="UTF-8" Yes, everytime you start the pipeline you need to read in all the config data. You can do this by flattening a bounded source which reads the current state of config data with a streaming source that gets updates and then use that as a side input. On the other hand, with a few million keys, using an in memory cache with your own refresh policy that contacts your datastore would also likely work well. On Thu, Oct 5, 2017 at 10:16 AM, Yihua Fang wrote: > I thought about streaming the updates and using side input, but since the > config data are not persisted in the data pipeline, how would the config > being populated in the first place. Is the solution to populate through > streaming every time the pipeline is rebooted? > > 1. The config data can be a few minutes to a few hours late. No need to > immediately reflects the config. > 2. The config data shouldn't be change often. It is configured by human > users. > 3. The config data per key should be about 10-20 key value pairs. > 4. Ideally the key number is in the range of a few millions, but a few > thousands to begin with. > > Thanks > Eric > > On Thu, Oct 5, 2017 at 9:09 AM Lukasz Cwik wrote: > >> Can you stream the updates to the keys into the pipeline and then use it >> as a side input performing a join against on your main stream that needs >> the config data? >> You could also use an in memory cache that periodically refreshes keys >> from the external source. >> >> A better answer depends on: >> * how stale can the config data be? >> * how often does the config data change? >> * how much config data you expect to have per key? >> * how many keys do you expect to have? >> >> >> On Wed, Oct 4, 2017 at 5:41 PM, Yihua Fang >> wrote: >> >>> Hi, >>> >>> The use case is that I have an external source that store a >>> configuration for each key accessible via restful APIs and Beam pipeline >>> should use the config to process each element for each key. What is the >>> best approach to facilitate injecting the latest config into the pipeline? >>> >>> Thanks >>> Eric >>> >> >> --001a1145311c38805a055ad07a2e Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Yes, everytime you start the pipeline you need to read in = all the config data. You can do this by flattening a bounded source which r= eads the current state of config data with a streaming source that gets upd= ates and then use that as a side input. On the other hand, with a few milli= on keys, using an in memory cache with your own refresh policy that contact= s your datastore would also likely work well.

On Thu, Oct 5, 2017 at 10:16 AM, Yihua Fa= ng <fang.yihua.eric@gmail.com> wrote:
I thought about streaming the updates = and using side input, but since the config data are not persisted in the da= ta pipeline, how would the config being populated in the first place. Is th= e solution to populate through streaming every time the pipeline is reboote= d?

1. The config data can be a few minutes to a few hour= s late. No need to immediately reflects the config.
2. The config= data shouldn't be change often. It is configured by human users.
=
3. The config data per key should be about 10-20 key value pairs.
4. Ideally the key number is in the range of a few millions, but a fe= w thousands to begin with.

Thanks
Eric

On Thu, Oct 5, 2017 at 9:09 AM Lukasz Cwik <lcwik@google.com> w= rote:
Can you stre= am the updates to the keys into the pipeline and then use it as a side inpu= t performing a join against on your main stream that needs the config data?=
You could also use an in memory cache that periodically refreshes keys= from the external source.

A better answer depends o= n:
* how stale can the config data be?
* how often does= the config data change?
* how much config data you expect to hav= e per key?
* how many keys do you expect to have?

<= /div>

= On Wed, Oct 4, 2017 at 5:41 PM, Yihua Fang <fang.yihua.eric@gmail.= com> wrote:
Hi,

The use case is that I have an external source tha= t store a configuration for each key accessible via restful APIs and Beam p= ipeline should use the config to process each element for each key. What is= the best approach to facilitate injecting the latest config into the pipel= ine?

Thanks
Eric


--001a1145311c38805a055ad07a2e--