flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Simone Robutti <simone.robu...@radicalbit.io>
Subject Merge the states of different partition in streaming
Date Tue, 27 Sep 2016 14:01:04 GMT
Hello,

I'm dealing with an analytical job in streaming and I don't know how to
write the last part.

Actually I want to count all the elements in a window with a given status,
so I keep a state with a Map[Status,Long]. This state is updated starting
from tuples containing the oldStatus and the newStatus. So every event
generates a +1 for the new status and a -1 for the old status. Then I want
to reduce all these counts and move from a local and partial state to a
global state that will be written in output.

Right now my code look like:

filteredLatestOrders.keyBy(x =>
x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))

where "filteredLatestOrder" is a DataStream containing informations about
the elements, the new state and the old state.

This produces in output:

2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)

I thought that keying with a fixed value would collect all the elements in
a single node so that I could finally compute the final result, but they
are left on different nodes and are never summed.

Is this the correct approach? In that case, how can I do what I need? Is
there a smarter way to count distinct evolving elements by their status in
a streaming? Mind that the original source of events are updates to the
status of an element and the requirement is that I want to count only the
latest status available.

Thank you in advance,

Simone

Mime
View raw message