Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5C12A200D55 for ; Sat, 9 Dec 2017 23:30:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5A7E5160C0E; Sat, 9 Dec 2017 22:30:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 519C1160BFE for ; Sat, 9 Dec 2017 23:30:05 +0100 (CET) Received: (qmail 69683 invoked by uid 500); 9 Dec 2017 22:30:04 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 69672 invoked by uid 99); 9 Dec 2017 22:30:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Dec 2017 22:30:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E62BC1A084D for ; Sat, 9 Dec 2017 22:30:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.402 X-Spam-Level: X-Spam-Status: No, score=-0.402 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id z_J_nKhFO_Yl for ; Sat, 9 Dec 2017 22:30:01 +0000 (UTC) Received: from mail-io0-f171.google.com (mail-io0-f171.google.com [209.85.223.171]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id EFC075F39F for ; Sat, 9 Dec 2017 22:30:00 +0000 (UTC) Received: by mail-io0-f171.google.com with SMTP id e204so5941476iof.12 for ; Sat, 09 Dec 2017 14:30:00 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=qyyk0HFw8K12syVVX/7gV21Ce03LEPhT/Xh5bfx7dp0=; b=ek0wI/+Ib3229vSyn6WP8J7zInXoEVJ2DZHpYuc7dcZ35ElvA5zpO4at/C6wUkkKcH qpq3Eu58faOg0hIuQThjuepC0C1PD1JLqYh0UWsQrHLb/66DXJyB4SdKSC1Ph/Loh4+c p8PWLWM4lkcgAilXLMr9/hUjiQTOYepE/B1mtVKLeS4XN0Y0gGE1CdxBOMbDDfBP1DeF 9eSQJoNGbsqbrlCcekZaYSx5B1M/npXQUX2r+T+0Zk8oghIG7Q/LlKg4B9ssyeUbgVsj x0HQTNu33Qt6avZx5zrwKvHzgSvaGvUYJsrH31zWM6BquOKC5I5FP2F4skmDB6MpknOn kI0g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=qyyk0HFw8K12syVVX/7gV21Ce03LEPhT/Xh5bfx7dp0=; b=JZOHgjiGtafHJTdtURkwgh4VCEGAHWQreC6JFXcsUEJmmXNUFZ++jXwpv0i3jSOItM hjC8eYlM2v/5ExX6Oqxf50UCwcyaq+TFUoQzkuvbmMsNlmwEXzTlsMGPI0fWxCEgEuTU yU4c257lO2q5S9ZivuCZCtJ/DaqsXCxxhL9iDoDVGF9F3CSQSwIXBAYJNbTROG+hpNtr gbwajza8Tw7IEIc6w17g2kEQsV4//D7i6r/FiSfbvoCCvdpFHRTcggEq6wLmbt9kqGYu nb6zUnuddlJ8Myvv/4jsVL//Ug8FfJXNSozy4gXi1zle/83m75vVpkpsF/Z1m9kEHKaG shFg== X-Gm-Message-State: AJaThX4X00QzG6tJw9vbpaAIUDQAQpO2qcayBbl3A0wkQhrtn4LmjPIL Z4ubf+oOAGxibVVN9BB0giaGmXxhuYKlhdqTRElsnFvSJKQ= X-Google-Smtp-Source: AGs4zMYkHFAOJEM7SAMEfOGv/o9gvwhd5mv1u7qZXLj1+008POatj6wxxEU7jjywCozI5fJFWN+d8XlmPR7PaDByBdM= X-Received: by 10.107.148.3 with SMTP id w3mr50946695iod.161.1512858593298; Sat, 09 Dec 2017 14:29:53 -0800 (PST) MIME-Version: 1.0 Received: by 10.107.136.167 with HTTP; Sat, 9 Dec 2017 14:29:32 -0800 (PST) In-Reply-To: References: From: Kevin Peterson Date: Sat, 9 Dec 2017 14:29:32 -0800 Message-ID: Subject: Re: updates on side inputs To: user@beam.apache.org Content-Type: multipart/alternative; boundary="001a113fd8ae0d3521055fefd7e1" archived-at: Sat, 09 Dec 2017 22:30:06 -0000 --001a113fd8ae0d3521055fefd7e1 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Hi Carsten, Similar patterns have come up before, you can see the threads here and here . The answer that worked for me was to use a static instance of Guava's LoadingCache, which all of your ParDo's can share. public static final LoadingCache CACHE =3D CacheBuilder.newBuilder() .refreshAfterWrite(30, TimeUnit.MINUTES) .build(new CacheLoader()); -Kevin On Sat, Dec 9, 2017 at 1:26 PM, Carsten Krebs | GameDuell < carsten.krebs@gameduell.de> wrote: > Hi, > > I=E2=80=99m currently trying to figure out, what would be the best way to= approach > the following task: > > I=E2=80=99m having a an unbounded stream of events carrying payment infor= mation of > varying currencies. For further processing I need to convert theses payme= nt > information into one common currency. > For this purpose I=E2=80=99m having a set of currency conversion rates, w= hich is > frequently updated by fetching it from external REST endpoint on larger > time intervals. I=E2=80=99m now trying to figure out, what is the best wa= y to =E2=80=A6. > > 1) schedule updating the currency rates in frequent intervals and > 2) to provide these currency conversion rates to the ParDo, which actuall= y > does the conversion. > > I was thinking mainly about two different ways to solve it, but they > didn=E2=80=99t felt right for me or didn=E2=80=99t worked. > > 1) I was thinking about having a ParDo with a Timer, which outputs > frequently the conversion rates, which then is used as side input for the > ParDo, which does the conversion. Here I=E2=80=99m struggling with how I= =E2=80=99m able to > update the data provided via the PCollectionView as side input to the > conversion ParDo on an currency update. > > My first try went into the following direction: > > PCollectionView> rates =3D pipeline > .apply(Create.of(KV.of("bootstrap", ""))) // dummy event to set u= p timer event & get initial set of conversion rates > .apply(ParDo.of(new CurrencyRates())) // fetch conversion rates > .apply(View.asSingleton()); > > pipeline.apply(testStream) > .apply(ParDo.of(new DoCurrencyConversionFn(rates)) > .withSideInputs(rates)); > > Which is of course not working, as I would assume, that I need somehow to > define how the =E2=80=9Csingleton view=E2=80=9D will be swapped out, when= a new map of > conversion rates is generated. > However I=E2=80=99m getting the following Exception: > java.lang.IllegalArgumentException: Can't add an element past the end of > time (294247-01-10T04:00:54.775Z), got timestamp 294247-01-10T04:00:54.77= 5Z > Which makes sense for me, as I would consider the PCollectionView as > static. > > Question regarding this, is there any way to make this =E2=80=9Csingleton= view=E2=80=9D > updatable? > I was thinking about if windowing would help in this case, but I was as > well not very sure, how to realise this, in that case. > > 2) The other option I considered, was to implement it as a ParDo, which > is stateful & using a timer. > Having a timer, which frequently triggers fetching currency updates, > storing it a state cell. > I decided against this, because it didn=E2=80=99t felt right, as I would = not have > a good key to group by. > > > > Any help or suggestions on this? > > Thanks & Best, > > Carsten > > > > > --001a113fd8ae0d3521055fefd7e1 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Carsten,

Similar patterns have come = up before, you can see the threads here and here.

<= /div>
The answer that worked for me was to use a static instance of Gua= va's LoadingCache, which all of your ParDo's can share.
<= pre style=3D"white-space:pre-wrap;background-color:rgb(43,43,43);color:rgb(= 169,183,198);font-family:Menlo;font-size:9pt">public static final LoadingCache<BlobId, TypesCache> CACHE =3D
CacheBuilder.newBuilder()
.refr= eshAfterWrite(30, TimeUnit.MINUTES)
.build(new Cac= heLoader());
-Kevin

On Sat, Dec 9, 2017 at 1:26 PM, Carsten Krebs |=C2= =A0GameDuell <carsten.krebs@gameduell.de> wrote:
Hi,
I=E2=80=99m currently trying to figure out, what would be t= he best way to approach the following task:

I=E2= =80=99m having a an unbounded stream of events carrying payment information= of varying currencies. For further processing I need to convert theses pay= ment information into one common currency.
For this purpose I=E2= =80=99m having a set of currency conversion rates, which is frequently upda= ted by fetching it from external REST endpoint on larger time intervals. I= =E2=80=99m now trying to figure out, what is the best way to =E2=80=A6.

1) schedule updating the currency rates in frequent i= ntervals and
2) to provide these currency conversion rates to the= ParDo, which actually does the conversion.

I was = thinking mainly about two different ways to solve it, but they didn=E2=80= =99t felt right for me or didn=E2=80=99t worked.

1= ) I was thinking about having a ParDo with a Timer, which outputs frequentl= y the conversion rates, which then is used as side input for the ParDo, whi= ch does the conversion. Here I=E2=80=99m struggling with how I=E2=80=99m ab= le to update the data provided via the=C2=A0PCollectionView <= span style=3D"background-color:rgb(255,255,255)">as side input to the conversion ParDo on an=C2=A0currency=C2=A0update.=C2=A0

My first try went into the following= direction:

PCollectionView<Map<String, BigDecima=
l>> rates =3D pipeline
.apply(Create.of(KV.of("bootstrap"= , ""))) // = dummy event to set up timer event & get initial set of conversion rates=
.apply(ParDo.of(new CurrencyRates())) // fe= tch conversion rates
.appl= y(View.asSingleton()= );

pipeline.apply(testStream)
.apply(ParDo.of(new DoCurrencyConversionFn(rates))
.withSideInputs(rate= s));
Which is of course not working, as I would assume, that I ne= ed somehow to define how the =E2=80=9Csingleton view=E2=80=9D will be swapp= ed out, when a new map of conversion rates is generated.
However = I=E2=80=99m getting the following Exception:=C2=A0
java.lang.IllegalArgumentException: Can't add an element past t= he end of time (294247-01-10T04:00:54.775Z), got timestamp 294247-01-10T04:= 00:54.775Z
Which makes sense for me, as I would consider the=C2= =A0PCollectionView as static.

Question = regarding this, is there any way to make this=C2=A0=E2=80=9Csingleton view=E2=80=9D=C2=A0updatable?
I was thinking about if win= dowing would help in this case, but I was as well not very sure, how to rea= lise this, in that case.

2) The other option I con= sidered, was to implement it as a =C2=A0ParDo, which is stateful & usin= g a timer.
Having a timer, which frequently triggers fetching cur= rency updates, storing it a state cell.
I decided against this, b= ecause it didn=E2=80=99t felt right, as I would not have a good key to grou= p by.



Any help or su= ggestions on this?

Thanks & Best,
Carsten



=C2= =A0

--001a113fd8ae0d3521055fefd7e1--