beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kevin Peterson <>
Subject Re: updates on side inputs
Date Sat, 09 Dec 2017 22:29:32 GMT
Hi Carsten,

Similar patterns have come up before, you can see the threads here
and here

The answer that worked for me was to use a static instance of Guava's
LoadingCache, which all of your ParDo's can share.

public static final LoadingCache<BlobId, TypesCache> CACHE =
                    .refreshAfterWrite(30, TimeUnit.MINUTES)
                    .build(new CacheLoader());


On Sat, Dec 9, 2017 at 1:26 PM, Carsten Krebs | GameDuell <> wrote:

> Hi,
> I’m currently trying to figure out, what would be the best way to approach
> the following task:
> I’m having a an unbounded stream of events carrying payment information of
> varying currencies. For further processing I need to convert theses payment
> information into one common currency.
> For this purpose I’m having a set of currency conversion rates, which is
> frequently updated by fetching it from external REST endpoint on larger
> time intervals. I’m now trying to figure out, what is the best way to ….
> 1) schedule updating the currency rates in frequent intervals and
> 2) to provide these currency conversion rates to the ParDo, which actually
> does the conversion.
> I was thinking mainly about two different ways to solve it, but they
> didn’t felt right for me or didn’t worked.
> 1) I was thinking about having a ParDo with a Timer, which outputs
> frequently the conversion rates, which then is used as side input for the
> ParDo, which does the conversion. Here I’m struggling with how I’m able to
> update the data provided via the PCollectionView as side input to the
> conversion ParDo on an currency update.
> My first try went into the following direction:
> PCollectionView<Map<String, BigDecimal>> rates = pipeline
>         .apply(Create.of(KV.of("bootstrap", ""))) // dummy event to set up timer event
& get initial set of conversion rates
>         .apply(ParDo.of(new CurrencyRates())) // fetch conversion rates
>         .apply(View.asSingleton());
> pipeline.apply(testStream)
>         .apply(ParDo.of(new DoCurrencyConversionFn(rates))
>         .withSideInputs(rates));
> Which is of course not working, as I would assume, that I need somehow to
> define how the “singleton view” will be swapped out, when a new map of
> conversion rates is generated.
> However I’m getting the following Exception:
> java.lang.IllegalArgumentException: Can't add an element past the end of
> time (294247-01-10T04:00:54.775Z), got timestamp 294247-01-10T04:00:54.775Z
> Which makes sense for me, as I would consider the PCollectionView as
> static.
> Question regarding this, is there any way to make this “singleton view”
> updatable?
> I was thinking about if windowing would help in this case, but I was as
> well not very sure, how to realise this, in that case.
> 2) The other option I considered, was to implement it as a  ParDo, which
> is stateful & using a timer.
> Having a timer, which frequently triggers fetching currency updates,
> storing it a state cell.
> I decided against this, because it didn’t felt right, as I would not have
> a good key to group by.
> Any help or suggestions on this?
> Thanks & Best,
> Carsten

View raw message