flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Nowojski <pi...@da-platform.com>
Subject Re: Query on retract stream
Date Mon, 21 Jan 2019 12:44:37 GMT
Hi,

There is a missing feature in Flink Table API/SQL of supporting retraction streams as the
input (or conversions from append stream to retraction stream) at the moment. With that your
problem would simplify to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There
is an ongoing work with related work [1], so this might be supported in the next couple of
months.

There might a workaround at the moment that could work. I think you would need to write your
own custom `LAST_ROW(x)` aggregation function, which would just return the value of the most
recent aggregated row. With that you could write a query like this:

SELECT 
	uid, count(*) 
FROM (
	SELECT 
		* 
	FROM (
		SELECT 
			uid, LAST_ROW(status)
		FROM
			changelog
		GROUP BY
			uid, oid)
	WHERE status = `pending`)
GROUP BY
	uid

Where `changelog` is an append only stream with the following content:

> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6


Besides that, you could also write your own a relatively simple Data Stream application to
do the same thing.

I’m CC’ing Timo, maybe he will have another better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalgagan@gmail.com> wrote:
> 
> Hi,
> I have a requirement and need to understand if same can be achieved with Flink retract
stream. Let's say we have stream with 4 attributes userId, orderId, status, event_time where
orderId is unique and hence any change in same orderId updates previous value as below
> 
> Changelog Event Stream
> 
> user, order, status, event_time
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, pending, t3
> u1, o3, success, t4
> u2, o4, pending, t5
> u2, o4, pending, t6
> 
> Snapshot view at time t6 (as viewed in mysql)
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for respective order
ids)
> 
> What I need is to maintain count of "Pending" orders against a user and if they go beyond
configured threshold, then push that user and pending count to Kafka. Here there can be multiple
updates to order status e.g Pending -> Success or Pending -> Failed. Also in some cases
there may not be any change in status but we may still get a row (may be due to some other
attribute update which we are not concerned about). So is it possible to have running count
in flink as below at respective event times. Here Pending count is decreased from 2 to 1 for
user u1 at t4 since one of it's order status was changed from Pending to Success. Similarly
for user u2, at time t6, there was no change in running count as there was no change in status
for order o4
> 
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is decreased for
u1)
> t5 -> u1 : 1, u2 : 1
> t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no change)
> 
> As I understand may be retract stream can achieve this. However I am not sure how. Any
samples around this would be of great help.
> 
> Gagan
> 


Mime
View raw message