flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxim Parkachov <lazy.gop...@gmail.com>
Subject Fwd: Initialise side input state
Date Fri, 03 Nov 2017 06:11:55 GMT
Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui <xingcanc@gmail.com> wrote:

> Hi Maxim,
> if I understand correctly, you actually need to JOIN the fast stream with
> the slow stream. Could you please share more details about your problem?

Sure I can explain more, with some example of pseudo-code. I have external
DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in
structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
PurchaseTotal] {

  private var price: ValueState[Int] = _

  override def processElement1....... {
    out.collect(PurchaseTotal(purchase.productId, purchase.amount *

  override def processElement2....... {

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will
not be populated with data which is not updated in DB.
BTW, I cannot use AsyncIO to query DB, because of several technical

1. When you mentioned "they have the same key", did you mean all the data
> get the same key or the logic should be applied with fast.key = slow.key?

I meant here that productId in purchase event is definitely exist in
external price list DB (so, it is kind of inner join)

> 2. What should be done to initialize the state?

I need to read external DB table and populate price ValueState before
processing first purchase event.

Hope this minimal example helps to understand.

> Best,
> Xingcan
> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov <lazy.gopher@gmail.com>
> wrote:
>> Hi Flink users,
>> I'm struggling with some basic concept and would appreciate some help. I
>> have 2 Input streams, one is fast event stream and one is slow changing
>> dimension. They have the same key and I use CoProcessFunction to store
>> slow data in state and enrich fast data from this state. Everything
>> works as expected.
>> Before I start processing fast streams on first run, I would like to completely
>> initialise state. I though it could be done in open(), but I don't
>> understand how it will be re-distributed across parallel operators.
>> Another alternative would be to create custom source and push all slow dimension
>> data downstream, but I could not find how to hold processing fast data
>> until state is initialised.
>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>> way to implement it now ?
>> Thanks,
>> Maxim.

View raw message