flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Joining two aggregated streams
Date Fri, 07 Jul 2017 13:45:49 GMT
Hi,

For this case, I would suggest to implement the join operation “by hand” using a CoProcessFunction:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#low-level-joins
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#low-level-joins>.
You would have the first stream on the first input and the second stream on the second input.
Inside the function you keep in state the stuff that you want to emit when you see that flag.
Possibly in ListState. I would also suggest to set a cleanup timer to make sure that you cleanup
state in case you never see the flag that triggers processing of your window.

Best,
Aljoscha

> On 5. Jul 2017, at 11:53, Udhay <udhayakumar.murugesan@gmail.com> wrote:
> 
> I was trying to join two keyed streams in a particular way and get a combined
> stream.
> For example:
> Lets say I call the two streams as X and Y. 
> The X stream contains:
> 
> (Key,Value)
> 
> (A,P)
> (A,Q)
> (A,R)
> (B,P)
> (C,P)
> (C,Q)
> 
> The Y stream contains:
> 
> (Key,Value,Flag1,Flag2)
> 
> (A,M1,0,0)
> (A,M2,0,0)
> (A,M3,1,0)
> (A,M4,0,0)
> (A,M5,1,0)
> (A,M6,0,1)
> (B,N1,0,0)
> (B,N2,1,0)
> (B,N3,0,1)
> (C,O1,1,0)
> (C,O2,0,1)
> 
> My objective is to join these two streams and get the combined value as
> described. I want a keywise aggregated data of "Value" field from the X
> stream. In the Y stream I want a keywise aggregation of "Value" field based
> on "Flag1" i.e., the output will be set of aggregated values. I want to join
> these two streams by maintaining a keyed window and that window gets
> triggered only when the "Flag2" value of a particular key in the Y stream is
> "1". These flag values are available only with Y stream and not with the X
> stream. Thus my end result should look like:
> 
> (Key,Value1,Value2)
> 
> (A,(P#Q#R),[(M1#M2#M4#M6),(M3#M5)])
> (B,P,[(N1#N3),(N2)])
> (C,(P#Q),[(O1),(O2)])
> 
> The timings for each of the rows in each stream are such that by the time
> the Flag2 value is 1 in stream Y (indicates some sort of end of a session),
> all the rows in stream X are also already available.
> 
> I tried to maintains state value inside my join function to get to the
> output. But I dont know how to query the state value and when to do it. Can
> anyone please suggest some solution?
> 
> 
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Joining-two-aggregated-streams-tp14123.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Mime
View raw message