beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <rang...@google.com>
Subject Re: Create SideInputs for PTranform using lookups for each window of data
Date Wed, 16 May 2018 01:06:14 GMT
You are applying windowing to 'billingDataPairs' in the example above. Side
input pairs with all the main input windows that exactly match or
completely fall within the side input window. Common use case is a side
input defined in default global window and it matches all the main input
windows.



On Tue, May 15, 2018 at 4:40 PM Harshvardhan Agrawal <
harshvardhan.agr93@gmail.com> wrote:

> Got it.
>
> Since I am not applying any windowing strategy to the side input, does
> beam automatically pickup the windowing strategy for the side inputs from
> the main input? By that I mean the scope of the side input would be a per
> window one and it would be different for every window. Is that correct?
>
> Regards,
> Harsh
>
> On Tue, May 15, 2018 at 17:54 Lukasz Cwik <lcwik@google.com> wrote:
>
>> Using deduplicate + side inputs will allow you to have a consistent view
>> of the account information for the entire window which can be nice since it
>> gives consistent processing semantics but using a simple in memory cache to
>> reduce the amount of lookups will likely be much easier to debug and
>> simpler to implement and maintain.
>>
>> On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
>> harshvardhan.agr93@gmail.com> wrote:
>>
>>> Thanks Raghu!
>>>
>>> Lukasz,
>>>
>>> Do you think lookups would be a better option than side inputs in my
>>> case?
>>>
>>>
>>> On Tue, May 15, 2018 at 16:33 Raghu Angadi <rangadi@google.com> wrote:
>>>
>>>> It should work. I think you need apply Distinct before looking up
>>>> account info :
>>>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>>>> ...).
>>>> Note that all of the accounts are stored in single in-memory map. It
>>>> should be small enough for that.
>>>>
>>>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>>>> harshvardhan.agr93@gmail.com> wrote:
>>>>
>>>>> Well ideally, I actually made the example a little easy. In the actual
>>>>> example I have multiple reference datasets. Say, I have a tuple of Account
>>>>> and Product as the key. The reason we don’t do the lookup in the DoFn
>>>>> directly is that we don’t want to lookup the data for the same account
or
>>>>> same product multiple times across workers in a window.
>>>>>
>>>>> What I was thinking was that it might be better to perform the lookup
>>>>> only once for each account and product in a window and then supply them
as
>>>>> side inputs to the main input.
>>>>>
>>>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <lcwik@google.com> wrote:
>>>>>
>>>>>> Is there a reason you don't want to read the accounting information
>>>>>> within the DoFn directly from the datastore, it seems like that would
be
>>>>>> your simplest approach.
>>>>>>
>>>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>>>> harshvardhan.agr93@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> No we don’t receive any such information from Kafka.
>>>>>>>
>>>>>>> The account information in the external store does change. Every
>>>>>>> time we have a change in the account information we will have
to recompute
>>>>>>> all the billing info. Our source systems will make sure that
they publish
>>>>>>> messages for those accounts again.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lcwik@google.com>
wrote:
>>>>>>>
>>>>>>>> For each BillingModel you receive over Kafka, how "fresh"
should
>>>>>>>> the account information be?
>>>>>>>> Does the account information in the external store change?
>>>>>>>>
>>>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>>>> harshvardhan.agr93@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have certain billing data that arrives to us from
Kafka. The
>>>>>>>>> billing data is in json and it contains an account ID.
In order for us to
>>>>>>>>> generate the final report we need to use some account
data associated with
>>>>>>>>> the account id and is stored in an external database.
>>>>>>>>>
>>>>>>>>> It is possible that we get multiple billing info messages
for the
>>>>>>>>> same account. We want to be able to lookup the account
information for the
>>>>>>>>> messages in a window and then supply that as a side input
to the next
>>>>>>>>> PTransform.
>>>>>>>>>
>>>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>>>
>>>>>>>>> Here is my attempt:
>>>>>>>>>
>>>>>>>>>     PCollection<KV<Integer, BillingModel>>
billingDataPairs =
>>>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>>>      )
>>>>>>>>>      .apply("Window",
>>>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>>>
>>>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>>
billingData =
>>>>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>>>>>>>>>
>>>>>>>>>      PCollectionView<Map<Integer, Account>>
accountData =
>>>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>>>
>>>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>>>> BillingModel>>(){
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>>>     Iterable<BillingModel> billingModel =
>>>>>>>>> ctx.element().getValue();
>>>>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>>>>     }
>>>>>>>>>     }));
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Harsh
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>>>> *267.991.6618 | LinkedIn
>>>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>>>
>>>>>>>> --
>>>>>>>
>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>> *267.991.6618 | LinkedIn
>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>
>>>>>> --
>>>>>
>>>>> *Regards,Harshvardhan Agrawal*
>>>>> *267.991.6618 | LinkedIn
>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>
>>>> --
>>>
>>> *Regards,Harshvardhan Agrawal*
>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>>
>> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Mime
View raw message