beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harshvardhan Agrawal <harshvardhan.ag...@gmail.com>
Subject Fwd: Create SideInputs for PTranform using lookups for each window of data
Date Tue, 15 May 2018 18:22:01 GMT
Hi,

We have certain billing data that arrives to us from Kafka. The billing
data is in json and it contains an account ID. In order for us to generate
the final report we need to use some account data associated with the
account id and is stored in an external database.

It is possible that we get multiple billing info messages for the same
account. We want to be able to lookup the account information for the
messages in a window and then supply that as a side input to the next
PTransform.

Is it possible to achieve that in Beam?

Here is my attempt:

    PCollection<KV<Integer, BillingModel>> billingDataPairs =
p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
     .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
     .withTopic(KAFKA_TOPIC)
     .withKeyDeserializer(StringDeserializer.class)
     .withValueDeserializer(StringDeserializer.class)
     )
     .apply("Window",
Window.into(FixedWindows.of(Duration.standardSeconds(30))))
     .apply("ProcessKafkaMessages",new KafkaProcessor());

     PCollection<KV<Integer, Iterable<BillingModel>> billingData =
billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());

     PCollectionView<Map<Integer, Account>> accountData =
billingDataPairs.apply("LookupAccounts",new
AccountLookupClient()).apply(View.asMap());

    billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, BillingModel>>(){
    @ProcessElement
    public void processElement(ProcessContext ctx) {
    Integer accountId = ctx.element().getKey();
    Iterable<BillingModel> billingModel = ctx.element().getValue();
    Account account = ctx.sideinput(accountData).get(accountId);
    }
    }));

Regards,
Harsh
-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Mime
View raw message