beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kevin Peterson <kevi...@google.com>
Subject Re: updates on side inputs
Date Sat, 09 Dec 2017 22:29:32 GMT
Hi Carsten,

Similar patterns have come up before, you can see the threads here
<https://lists.apache.org/thread.html/a5d804685a5810594a7860709fbcd6d3a22ead6e871fc3073a65ef1e@%3Cuser.beam.apache.org%3E>
and here
<https://lists.apache.org/thread.html/681de1ae372951988a00b9affa7480f3117d3cae6dae9ee2c69baba4@%3Cuser.beam.apache.org%3E>
.

The answer that worked for me was to use a static instance of Guava's
LoadingCache, which all of your ParDo's can share.

public static final LoadingCache<BlobId, TypesCache> CACHE =
        CacheBuilder.newBuilder()
                    .refreshAfterWrite(30, TimeUnit.MINUTES)
                    .build(new CacheLoader());

-Kevin

On Sat, Dec 9, 2017 at 1:26 PM, Carsten Krebs | GameDuell <
carsten.krebs@gameduell.de> wrote:

> Hi,
>
> I’m currently trying to figure out, what would be the best way to approach
> the following task:
>
> I’m having a an unbounded stream of events carrying payment information of
> varying currencies. For further processing I need to convert theses payment
> information into one common currency.
> For this purpose I’m having a set of currency conversion rates, which is
> frequently updated by fetching it from external REST endpoint on larger
> time intervals. I’m now trying to figure out, what is the best way to ….
>
> 1) schedule updating the currency rates in frequent intervals and
> 2) to provide these currency conversion rates to the ParDo, which actually
> does the conversion.
>
> I was thinking mainly about two different ways to solve it, but they
> didn’t felt right for me or didn’t worked.
>
> 1) I was thinking about having a ParDo with a Timer, which outputs
> frequently the conversion rates, which then is used as side input for the
> ParDo, which does the conversion. Here I’m struggling with how I’m able to
> update the data provided via the PCollectionView as side input to the
> conversion ParDo on an currency update.
>
> My first try went into the following direction:
>
> PCollectionView<Map<String, BigDecimal>> rates = pipeline
>         .apply(Create.of(KV.of("bootstrap", ""))) // dummy event to set up timer event
& get initial set of conversion rates
>         .apply(ParDo.of(new CurrencyRates())) // fetch conversion rates
>         .apply(View.asSingleton());
>
> pipeline.apply(testStream)
>         .apply(ParDo.of(new DoCurrencyConversionFn(rates))
>         .withSideInputs(rates));
>
> Which is of course not working, as I would assume, that I need somehow to
> define how the “singleton view” will be swapped out, when a new map of
> conversion rates is generated.
> However I’m getting the following Exception:
> java.lang.IllegalArgumentException: Can't add an element past the end of
> time (294247-01-10T04:00:54.775Z), got timestamp 294247-01-10T04:00:54.775Z
> Which makes sense for me, as I would consider the PCollectionView as
> static.
>
> Question regarding this, is there any way to make this “singleton view”
> updatable?
> I was thinking about if windowing would help in this case, but I was as
> well not very sure, how to realise this, in that case.
>
> 2) The other option I considered, was to implement it as a  ParDo, which
> is stateful & using a timer.
> Having a timer, which frequently triggers fetching currency updates,
> storing it a state cell.
> I decided against this, because it didn’t felt right, as I would not have
> a good key to group by.
>
>
>
> Any help or suggestions on this?
>
> Thanks & Best,
>
> Carsten
>
>
>
>
>

Mime
View raw message