Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DCE64200B8F for ; Fri, 30 Sep 2016 11:00:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DBC86160AD9; Fri, 30 Sep 2016 09:00:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D0B9F160AC4 for ; Fri, 30 Sep 2016 11:00:17 +0200 (CEST) Received: (qmail 972 invoked by uid 500); 30 Sep 2016 09:00:16 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 961 invoked by uid 99); 30 Sep 2016 09:00:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2016 09:00:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 78CD4180536 for ; Fri, 30 Sep 2016 09:00:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.453 X-Spam-Level: *** X-Spam-Status: No, score=3.453 tagged_above=-999 required=6.31 tests=[AC_DIV_BONANZA=0.001, DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_SOFTFAIL=0.972] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=radicalbit-io.20150623.gappssmtp.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id l7r20zX-8oF9 for ; Fri, 30 Sep 2016 09:00:10 +0000 (UTC) Received: from mail-lf0-f51.google.com (mail-lf0-f51.google.com [209.85.215.51]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id BFD9F5FB5D for ; Fri, 30 Sep 2016 09:00:09 +0000 (UTC) Received: by mail-lf0-f51.google.com with SMTP id l131so100415773lfl.2 for ; Fri, 30 Sep 2016 02:00:09 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=radicalbit-io.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=kyNDR4n09tdG+M1d5Gh+TtVB5F4QR8/5vhYhBtKr/DE=; b=kh4uQe+1/aSE5o+yIoIV049h462MmlwY7xYpGNXsnw4zR9uqBOOgpePg1xbh084fbR IJhN8dNm2Vj3ny/uLiwmb5c9LpHYuGpqHQJCLWP/yU+TQyYdlFaIAHSceuC2+EevnGIX w9KoH0eQz9e/L/DER7F6XX3D/RuHXyiAxyQxKdJ+kR5zvmuOqWeXF6QHt188Oe4jWfDY yhwXe9f9/KXCWTuVu4sm+vQPmg2YaNTDdDtLC0eeL/tRSVf4g8EyIUTzxgBN2bXlSKhq vrOLUyeURms+HhBVzs5yz9NL7AEbQ1GihF8d/g9sfNua4qrEmPHbZOcrIVZLIcYppFQf 3ltw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=kyNDR4n09tdG+M1d5Gh+TtVB5F4QR8/5vhYhBtKr/DE=; b=MHpES20Cud69ryD2eXu+S8niMGTr74u+QszdzsSjfYaM1i4V/0n2PqmpVcusEeANr4 8BfCCLfojMCuT88c56kdw3N1K37eNGoTghxVnLrSR39V6k85tvwuez+25Jrn7UWOMDjg IAF5gUaY+D+NhqeCYx+VX6Gf3UqNlvT6sUM/wf4iqeCrCBzrFHcmYrbPqaXUnLzBRujA RoBYKxOP41HciI04Xfuc57MNzU/8uPm8y4G0+qLrvzq8Y72l9lh72rnHyzqJ4oGSwHH2 1V5YIz+K9Q7OsZ5hQRkOPPbMnDAJa37Aqmya9pfcfb6dyiMJ89AKUHQ0T1Ri7xfRh2hu ZU5g== X-Gm-Message-State: AA6/9Rnes7DlI2KRLPKsZUZQGVieGiyqszYAnDH5MCCF2/u06wR6gcdxWi8lE0i77Offnz5VPEQBFCBLR8WLLfp4 X-Received: by 10.25.160.71 with SMTP id j68mr2202328lfe.17.1475226008095; Fri, 30 Sep 2016 02:00:08 -0700 (PDT) MIME-Version: 1.0 Received: by 10.25.19.21 with HTTP; Fri, 30 Sep 2016 02:00:07 -0700 (PDT) In-Reply-To: References: From: Simone Robutti Date: Fri, 30 Sep 2016 11:00:07 +0200 Message-ID: Subject: Re: Counting latest state of stateful entities in streaming To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114019662d1f38053db5d2ee archived-at: Fri, 30 Sep 2016 09:00:19 -0000 --001a114019662d1f38053db5d2ee Content-Type: text/plain; charset=UTF-8 I'm working with your suggestions, thank you very much. What I'm missing here is what YourWindowFunction should do. I have no notion of event time there and so I can't assign a timestamp. Also this solution seems to be working by processing time, while I care about event time. I couldn't make it run yet but for what I got, this is slightly different from what I need. 2016-09-30 10:04 GMT+02:00 Fabian Hueske : > Hi Simone, > > I think I have a solution for your problem: > > val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time) > > val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate) > .keyBy(_._1) // key by id > .flatMap(new StateUpdater) // StateUpdater is a stateful > FlatMapFunction. It has a keyed state that stores the last state of each > id. For each input record it returns two records: (oldState, -1), > (newState, +1) > > stateChanges ensures that counts of previous states are subtracted. > > val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges // > (state, cntUpdate, time) > .keyBy(_._1) // key by state > .window() // your window, should be non-overlapping, so go for instance > for Tumbling > .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums > the cntUpdates and YourWindowFunction assigns the timestamp of your window > > this step aggregates all state changes for each state in a window > > val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, > time) > .keyBy(_._1) // key by state again > .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has > a keyed state that stores the current count. For each incoming record, the > count is adjusted and a record (state, newCount, time) is emitted. > > Now you have the new counts for your states in multiple records. If > possible, you can update your Elasticsearch index using these. Otherwise, > you have to collect them into one record using another window. > > Also note, that the state size of this program depends on the number of > unique ids. That might cause problems if the id space grows very fast. > > Please let me know, if you have questions or if that works ;-) > > Cheers, Fabian > > > 2016-09-30 0:32 GMT+02:00 Simone Robutti : > >> Hello, >> >> in the last few days I tried to create my first real-time analytics job >> in Flink. The approach is kappa-architecture-like, so I have my raw data on >> Kafka where we receive a message for every change of state of any entity. >> >> So the messages are of the form >> >> (id,newStatus, timestamp) >> >> We want to compute, for every time window, the count of items in a given >> status. So the output should be of the form >> >> (outputTimestamp, state1:count1,state2:count2 ...) >> >> or equivalent. These rows should contain, at any given time, the count of >> the items in a given status, where the status associated to an Id is the >> most recent message observed for that id. The status for an id should be >> counted in any case, even if the event is way older than those getting >> processed. So the sum of all the counts should be equal to the number of >> different IDs observed in the system. The following step could be >> forgetting about the items in a final item after a while, but this is not a >> strict requirement right now. >> >> This will be written on elasticsearch and then queried. >> >> I tried many different paths and none of them completely satisfied the >> requirement. Using a sliding window I could easily achieve the expected >> behaviour, except that when the beginning of the sliding window surpassed >> the timestamp of an event, it was lost for the count, as you may expect. >> Others approaches failed to be consistent when working with a backlog >> because I did some tricks with keys and timestamps that failed when the >> data was processed all at once. >> >> So I would like to know, even at an high level, how should I approach >> this problem. It looks like a relatively common use-case but the fact that >> the relevant information for a given ID must be retained indefinitely to >> count the entities correctly creates a lot of problems. >> >> Thank you in advance, >> >> Simone >> >> > --001a114019662d1f38053db5d2ee Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
I'm working with your suggestions, thank you very= much. What I'm missing here is what YourWindowFunction should do. I ha= ve no notion of event time there and so I can't assign a timestamp. Als= o this solution seems to be working by processing time, while I care about = event time. I couldn't make it run yet but for what I got, this is slig= htly different from what I need.
=
2016-09-30 10:04 GMT+02:00 Fabian Hueske <f= hueske@gmail.com>:
<= div>
Hi Simone,

I think I have a solution for your problem= :

val s: DataStream[(Long, Int, ts)] =3D ??? // (id, state, ti= me)

val stateChanges: DataStream[(Int, Int)] =3D s // (state, = cntUpdate)
=C2=A0 .keyBy(_._1) // key by id
=C2=A0 .flatMap(new= StateUpdater) // StateUpdater is a stateful FlatMapFunction. It has a keye= d state that stores the last state of each id. For each input record it ret= urns two records: (oldState, -1), (newState, +1)

stateCha= nges ensures that counts of previous states are subtracted.
<= br>
val changesPerWindow: DataStream[(Int, Int, Long)] =3D stateChange= s // (state, cntUpdate, time)
=C2=A0 .keyBy(_._1) // key by state<= br>
=C2=A0 .window() // your window, should be non-overlapping, so go = for instance for Tumbling
=C2=A0 .apply(new SumReducer(), new Your= WindowFunction()) // SumReducer sums the cntUpdates and YourWindowFunction = assigns the timestamp of your window

this step aggregates all = state changes for each state in a window

val stateCnts: DataSt= ream[(Int, Int, Long)] =3D stateCnts (state, count, time)
=C2=A0 .= keyBy(_._1) // key by state again
=C2=A0 .map(new CountUpdater) //= CountUpdater is a stateful MapFunction. I has a keyed state that stores th= e current count. For each incoming record, the count is adjusted and a reco= rd (state, newCount, time) is emitted.

Now you have the new co= unts for your states in multiple records. If possible, you can update your = Elasticsearch index using these. Otherwise, you have to collect them into o= ne record using another window.

Also note, that the state= size of this program depends on the number of unique ids. That might cause= problems if the id space grows very fast.

Please l= et me know, if you have questions or if that works ;-)

Cheers,= Fabian


2016-09-30 0:32 GMT+02:00 Simone Robutti <simo= ne.robutti@radicalbit.io>:
Hello,=C2=A0

in the last few days = I tried to create my first real-time analytics job in Flink. The approach i= s kappa-architecture-like, so I have my raw data on Kafka where we receive = a message for every change of state of any entity.

So the messages are of the form=C2=A0

(id,newStat= us, timestamp)

We want to compute, for every time = window, the count of items in a given status. So the output should be of th= e form=C2=A0

(outputTimestamp, state1:count1,state= 2:count2 ...)

or equivalent. These rows should con= tain, at any given time, the count of the items in a given status, where th= e status associated to an Id is the most recent message observed for that i= d. The status for an id should be counted in any case, even if the event is= way older than those getting processed. So the sum of all the counts shoul= d be equal to the number of different IDs observed in the system. The follo= wing step could be forgetting about the items in a final item after a while= , but this is not a strict requirement right now.

= This will be written on elasticsearch and then queried.

I tried many different paths and none of them completely satisfied th= e requirement. Using a sliding window I could easily achieve the expected b= ehaviour, except that when the beginning of the sliding window surpassed th= e timestamp of an event, it was lost for the count, as you may expect. Othe= rs approaches failed to be consistent when working with a backlog because I= did some tricks with keys and timestamps that failed when the data was pro= cessed all at once.

So I would like to know, even = at an high level, how should I approach this problem. It looks like a relat= ively common use-case but the fact that the relevant information for a give= n ID must be retained indefinitely to count the entities correctly creates = a lot of problems.

Thank you in advance,

Simone=C2=A0



--001a114019662d1f38053db5d2ee--