flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Sharing State between Operators
Date Wed, 18 May 2016 11:32:03 GMT
I prepared a small example that outlines how something like this could be
implemented:
https://gist.github.com/aljoscha/36afedce40abf8ae92b92d4355809ff1

It doesn't include all your requirements, such as count per wall, etc. But
this should get you started on the right path.

I hope this helps!

On Fri, 13 May 2016 at 20:18 nsengupta <sengupta.nirmalya@gmail.com> wrote:

> Hello Flinksters
>
>
> Alright. So, I had a fruitful exchange of messages with Balaji earlier
> today, on this topic. I moved ahead with the understanding derived from the
> exchange (thanks, Balaji) at the time. But, now I am back because I think
> my
> approach is unclean, if not incorrect. There probably is a smarter way to
> achieve the same but I can't figure it out.
>
> Here's the problem:
>
> A building has 4 walls (0,1,2,3). On each wall, a number of devices has
> been
> planted to capture some physical attribute: let's say temperature at that
> spot. Every device has a unique ID.
>
> A typical tuple looks like this (Reading ==> Temperature as an Integer):
> (TupleType,Time,WallID,DeviceID,Reading)
>
> The system works on the basis of records arriving in a time-window of 60
> seconds. We can consider this to be a Tumbling Window. The time (and Window
> assignment etc.) is not the issue here. The 'Time' field increases
> monotonically.
>
> If TupleType == 0, I need to compute and update my data structures from the
> stream
>
> If TupleType == 1, I need to emit the maximum temperature recorded by the
> DeviceID out of last 5 readings.
>
> If TupleType == 2, I need to emit the number of readings so far arrived
> from
> the particular wall. Obviously, in this case, we will ignore the value of
> fields 'DeviceID' and 'Reading' in the tuple.
>
> The Application generates output for TupleType 1 and TupleType 2.
>
> The TupleTypes can arrive in any order. For example, TupleType 1 may arrive
> with a DeviceID which the application hasn't seen before (no corresponding
> TupleType 0 has arrived earlier with that DeviceID). Let us assume that we
> have a fallback value to be emitted for such cases, to keep things simple.
>
> In my mind, the implementation should be along this line:
>
> - Split the incoming Stream in three separate substreams using SplitStream,
> based upon TupleType
> - For StreamOFTupleType0,
>   - KeyBy(DeviceID)
>   - Apply a Mapper
>      - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar,
> FixedSizeList[Reading])] somewhere
>   - Apply (next) Mapper
>      - Calculate the total count of reading the Wall so far
>      - Update a Map [WallID, Count]
>
> - For StreamOFTupleType1
>      - Access the Map created/updated through the first Mapper above
>      - Emit
>
> - For StreamOFTupleType2
>     - Access the Map created/updated through the second Mapper above.
>     - Emit
>
> I have hit a wall to decide how the live data structures should be created,
> updated and accessed, correctly and efficiently  in a situation like above.
> More importantly, how will they be shared between operators, across
> partitions (nodes).
>
> I can't broadcast the Maps because they are not READONLY (/aka/ LookUp
> only).
>
> I can't create RichMapFunction local data structures because they are not
> shared between partitions (my understanding). They will be blind to the
> effect of accumulation. Each will begin with an empty Map.
>
> I have done a bit of exploration and I have found this  thread
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/mutable-hashmap-outside-of-stream-does-it-get-snapshotted-td6002.html#a6013
> >
> in the forum. I have understood what  Stephano
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=266
> >
> is suggesting ('..State is moved along pipeline ..')  but then, failed to
> figure out how to apply in my case, if at all possible.
>
> I have been thinking about using an external DB-like datastore but I want
> to
> be sure about the inevitability of that decision. If I use a DB, then the
> focus may go to the INSERT/SELECT like queries. My application then becomes
> more of a distributed DB application rather than a lean Streaming
> application. That thought doesn't make me happy! :-)
>
> Please make me wiser (by pointing out gaps in understanding where they
> exist). If any more specific information helps you, please ask me.
>
> My primary aim is to have a clarity of the recipe of a UseCase like this.
>
> -- Nirmalya
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Mime
View raw message