From user-return-25458-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jan 21 13:44:51 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4138F180648 for ; Mon, 21 Jan 2019 13:44:50 +0100 (CET) Received: (qmail 95579 invoked by uid 500); 21 Jan 2019 12:44:44 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 95562 invoked by uid 99); 21 Jan 2019 12:44:44 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2019 12:44:44 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id D2B03C6890 for ; Mon, 21 Jan 2019 12:44:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.998 X-Spam-Level: * X-Spam-Status: No, score=1.998 tagged_above=-999 required=6.31 tests=[DKIMWL_WL_MED=-0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=da-platform-com.20150623.gappssmtp.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id fY8T72mRF5r0 for ; Mon, 21 Jan 2019 12:44:40 +0000 (UTC) Received: from mail-wr1-f65.google.com (mail-wr1-f65.google.com [209.85.221.65]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 5C5F85F3EF for ; Mon, 21 Jan 2019 12:44:40 +0000 (UTC) Received: by mail-wr1-f65.google.com with SMTP id p4so23177309wrt.7 for ; Mon, 21 Jan 2019 04:44:40 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=da-platform-com.20150623.gappssmtp.com; s=20150623; h=from:message-id:mime-version:subject:date:in-reply-to:cc:to :references; bh=DTuHd/eZf+kNeD5SJcfuLUhtBriqrIqiS/tUUPzpJwg=; b=q3g1smqsRo3B7rQ2uy0GeN0bHJYzRUXelkkW4y8IiLTB+SiQh0uAXCk8Xh5zpHO/8/ M9YFUIdKUu8Gsg07pb4ZZ9SrI/XYMvzMGeJ9HvS0duwImhdTzXMry/iR/7oaSdBuJwdE 5iKoM0hGSBHL0hQxOWRsInUVdpDLtn0XHb7fhKnDa83UadEtI6JhdA/b/Y9fMRwI+Ze7 hIZ7Y0tJ9K4mnLNFu9nWSF1fs4YFZeC9GLLaOf0MMNEYv5Pi0MGpl31+6/bWAzlC1PWA QSAPCFu9oalFnMmvektq3jPc91iUbZ5hZIthUEzcOKytLMCNgXeFpH04wCA/2GGTJqML bC8g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:message-id:mime-version:subject:date :in-reply-to:cc:to:references; bh=DTuHd/eZf+kNeD5SJcfuLUhtBriqrIqiS/tUUPzpJwg=; b=L8b1cl0arnhO6Y9mDA3yPnZRKdsrtNqLaJvc4R6VU5Cq9NYLhRvybhr2DZfQ0pdh+r QuySXri3LbdATtZax+eHu3CIq6houIuIAPEtBtmnOnsMvmOrwna3b/PBYYoiW3Aep3A0 sAWNo1uchxyUBMUW8UwbsDc6jlEElHsgXZVqvHVU11BkaIQibcxZ+nEAKunfulEjy0e8 9jKX6qSpnv3N+nUI+Y5q00t/frxnlSSmrav37wWfn48EXqUpYt5Z4Cp4QgMx/zY0Vquq MP3ULLr3ZJGD+OcIAfSIJEtZidKJljUIp4gJlVyjBs71EzW7LqBm3gyp/6B5E8dsigbe koJQ== X-Gm-Message-State: AJcUukdeOlA659tdvS6Ch+OPzqhWrYZWfOYqA1YNMaA20L+5EmJ1vXf6 riQ3ls4tCqe6NYL9yeJ3dxpxbw== X-Google-Smtp-Source: ALg8bN4gn96LRRncr1uQuM7E5Ggyt5SIQhu0ZYX9sp6Fi4uLroYvayIDWr98PuWm5hfaWHLnvrw1Ag== X-Received: by 2002:a5d:550f:: with SMTP id b15mr29120739wrv.330.1548074679821; Mon, 21 Jan 2019 04:44:39 -0800 (PST) Received: from piotrs-mbp.office.data-artisans.net (dslb-002-205-086-134.002.205.pools.vodafone-ip.de. [2.205.86.134]) by smtp.gmail.com with ESMTPSA id e27sm107970968wra.67.2019.01.21.04.44.38 (version=TLS1_2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Mon, 21 Jan 2019 04:44:39 -0800 (PST) From: Piotr Nowojski Message-Id: <9A7BA18A-0FA9-4F50-8BE5-1756C17EE6F0@da-platform.com> Content-Type: multipart/alternative; boundary="Apple-Mail=_B9CBA194-64C4-4F75-BE74-79629B84ADAF" Mime-Version: 1.0 (Mac OS X Mail 12.2 \(3445.102.3\)) Subject: Re: Query on retract stream Date: Mon, 21 Jan 2019 13:44:37 +0100 In-Reply-To: Cc: user To: Gagan Agrawal , Timo Walther References: X-Mailer: Apple Mail (2.3445.102.3) --Apple-Mail=_B9CBA194-64C4-4F75-BE74-79629B84ADAF Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, There is a missing feature in Flink Table API/SQL of supporting = retraction streams as the input (or conversions from append stream to = retraction stream) at the moment. With that your problem would simplify = to one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There = is an ongoing work with related work [1], so this might be supported in = the next couple of months. There might a workaround at the moment that could work. I think you = would need to write your own custom `LAST_ROW(x)` aggregation function, = which would just return the value of the most recent aggregated row. = With that you could write a query like this: SELECT=20 uid, count(*)=20 FROM ( SELECT=20 *=20 FROM ( SELECT=20 uid, LAST_ROW(status) FROM changelog GROUP BY uid, oid) WHERE status =3D `pending`) GROUP BY uid Where `changelog` is an append only stream with the following content: > user, order, status, event_time > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, pending, t3 > u1, o3, success, t4 > u2, o4, pending, t5 > u2, o4, pending, t6 Besides that, you could also write your own a relatively simple Data = Stream application to do the same thing. I=E2=80=99m CC=E2=80=99ing Timo, maybe he will have another better idea. Piotrek [1] https://issues.apache.org/jira/browse/FLINK-8577 > On 18 Jan 2019, at 18:30, Gagan Agrawal = wrote: >=20 > Hi, > I have a requirement and need to understand if same can be achieved = with Flink retract stream. Let's say we have stream with 4 attributes = userId, orderId, status, event_time where orderId is unique and hence = any change in same orderId updates previous value as below >=20 > Changelog Event Stream >=20 > user, order, status, event_time > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, pending, t3 > u1, o3, success, t4 > u2, o4, pending, t5 > u2, o4, pending, t6 >=20 > Snapshot view at time t6 (as viewed in mysql) > u1, o1, pending, t1 > u2, o2, failed, t2 > u1, o3, success, t4 > u4, o4, pending, t6 > (Here rows at time t3 and t5 are deleted as they have been updated for = respective order ids) >=20 > What I need is to maintain count of "Pending" orders against a user = and if they go beyond configured threshold, then push that user and = pending count to Kafka. Here there can be multiple updates to order = status e.g Pending -> Success or Pending -> Failed. Also in some cases = there may not be any change in status but we may still get a row (may be = due to some other attribute update which we are not concerned about). So = is it possible to have running count in flink as below at respective = event times. Here Pending count is decreased from 2 to 1 for user u1 at = t4 since one of it's order status was changed from Pending to Success. = Similarly for user u2, at time t6, there was no change in running count = as there was no change in status for order o4 >=20 > t1 -> u1 : 1, u2 : 0 > t2 -> u1 : 1, u2 : 0 > t3 -> u1 : 2, u2 : 0 > t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is = decreased for u1) > t5 -> u1 : 1, u2 : 1 > t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no = change) >=20 > As I understand may be retract stream can achieve this. However I am = not sure how. Any samples around this would be of great help. >=20 > Gagan >=20 --Apple-Mail=_B9CBA194-64C4-4F75-BE74-79629B84ADAF Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

There = is a missing feature in Flink Table API/SQL of supporting retraction = streams as the input (or conversions from append stream to retraction = stream) at the moment. With that your problem would simplify to one = simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an = ongoing work with related work [1], so this might be supported in the = next couple of months.

There might a workaround at the moment that could work. I = think you would need to write your own custom `LAST_ROW(x)` aggregation = function, which would just return the value of the most recent = aggregated row. With that you could write a query like this:

SELECT 
= uid, count(*) 
FROM (
= SELECT 
= FROM (
SELECT 
= uid, LAST_ROW(status)
= FROM
= changelog
GROUP BY
= uid, oid)
WHERE = status =3D `pending`)
GROUP BY
= uid

Where= `changelog` is an append only stream with the following = content:

user,= order, status, event_time
u1, o1, pending, = t1
u2, o2, failed, t2
u1, o3, pending, = t3
u1, o3, success, t4
u2, o4, pending, t5
u2, o4, pending, = t6


Besides that, you could = also write your own a relatively simple Data Stream application to do = the same thing.

I=E2=80=99m CC=E2=80=99ing Timo, maybe he will have another = better idea.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-8577

On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalgagan@gmail.com> wrote:

Hi,
I have a = requirement and need to understand if same can be achieved with Flink = retract stream. Let's say we have stream with 4 attributes userId, = orderId, status, event_time where orderId is unique and hence any change = in same orderId updates previous value as below

Changelog Event Stream

user, order, status, = event_time
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, pending, t3
u1, o3, = success, t4
u2, o4, pending, t5
u2, o4, pending, t6

Snapshot view at time t6 (as viewed in mysql)
u1, o1, pending, t1
u2, o2, failed, = t2
u1, o3, success, t4
u4, o4, = pending, t6
(Here rows at time t3 and t5 are = deleted as they have been updated for respective order ids)

What I need is to = maintain count of "Pending" orders against a user and if they go beyond = configured threshold, then push that user and pending count to Kafka. = Here there can be multiple updates to order status e.g Pending -> = Success or Pending -> Failed. Also in some cases there may not be any = change in status but we may still get a row (may be due to some other = attribute update which we are not concerned about). So is it possible to = have running count in flink as below at respective event times. Here = Pending count is decreased from 2 to 1 for user u1 at t4 since one of = it's order status was changed from Pending to Success. Similarly for = user u2, at time t6, there was no change in running count as there was = no change in status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, = u2 : 0
t4 -> u1 : 1, u2 : 0 (since = o3 moved pending to success, so count is decreased for u1)
t5 -> u1 : 1, u2 : 1
t6 = -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no = change)

As I understand may be retract = stream can achieve this. However I am not sure how. Any samples around = this would be of great help.

Gagan


= --Apple-Mail=_B9CBA194-64C4-4F75-BE74-79629B84ADAF--