From user-return-27358-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Apr 26 08:10:59 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id A91F818064C for ; Fri, 26 Apr 2019 10:10:58 +0200 (CEST) Received: (qmail 20805 invoked by uid 500); 26 Apr 2019 08:10:56 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 20795 invoked by uid 99); 26 Apr 2019 08:10:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Apr 2019 08:10:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id EC381C2149 for ; Fri, 26 Apr 2019 08:10:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.636 X-Spam-Level: * X-Spam-Status: No, score=1.636 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.164, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ZTKyFzBwRuJe for ; Fri, 26 Apr 2019 08:10:54 +0000 (UTC) Received: from mail-io1-f68.google.com (mail-io1-f68.google.com [209.85.166.68]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id DDD905FD47 for ; Fri, 26 Apr 2019 08:10:53 +0000 (UTC) Received: by mail-io1-f68.google.com with SMTP id m188so2240392ioa.9 for ; Fri, 26 Apr 2019 01:10:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=hiAo5w3XePJMAnh8s4uRxW1h0IEXpoWwlnfL/UMRV0M=; b=P2f9AMaB/B+c+fYl6hOVOhyvVVEqykICIOijBkRdNoVqIzpXkF9J49OceA4Ehpuxo1 3ZWIS55mil/y2WsrTqqzm6jgsERtu159nCAwCJds4kr3TxBFCWc/Bv1kfa72ULexuxD+ BYw0amqfEOvblh9Fqgjx3ZM5BUNxHMLGfhLOmCVdeyI0f48HFnEHD7tj59Gv8WeGMnrb esjMgENeZDfGpmUqatvV3/yeAsz2VoSen9He8XNkSzhvbU9EPCuDS7yMI8tf3N6Tv7wC fDa6Pkj1J0ss7B9uyiQAv4X9sR8soZX8QMtTf4CoK9nm99H6Pz+s77vC+a/XB67e8a8h qgqw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=hiAo5w3XePJMAnh8s4uRxW1h0IEXpoWwlnfL/UMRV0M=; b=Ye0iG+1RaDPPaw8XPq7kjEZcU+lSCHsFaBwVCUhbJm9RCag3UvS98Xf2FjneIWuhli 34O/3396r0cMyAMO2Fuwc/oGnjW7w8usRdrtW6bgvwMEaNcl16sMj8cdbfh+WGegvNxk QFIE6hY5h6DSnFzu04nvEDWxyxg5mrSeXAYMwkukVp2UWh9L8EOBOizAHh2Fy9G5p7ej YBNNdReLKn+HMwOSCRj5AiAHxXdKcaX243i2Nk2uC8L9Bn2r0OS5RDpFdG1G31YiFfky Qm/xZnI6HAXzfZnELk03z0TkyhH6WQ6t5/luUhuNJbmXXKBh7PweOKY+3LrhIjoVSgAC ZPTQ== X-Gm-Message-State: APjAAAXvGqEa8plvlIgHrhKz2cAjYIsc26AVTvAjpu0Vjy9sFF1uSKYh 2wOV1H0rJthy5BFnG9Sx7K3qpIfXBpcoWqz3vTc= X-Google-Smtp-Source: APXvYqy8y0V54WYQBKD5DbMOifg2ioy6lHXD0B2QGBlSV6r1PrV55Vx6OdJt0lTgfSW0u/l+n5ngvVaK5aw6F3pPDDA= X-Received: by 2002:a5d:8b97:: with SMTP id p23mr14204157iol.83.1556266253330; Fri, 26 Apr 2019 01:10:53 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: Felipe Gutierrez Date: Fri, 26 Apr 2019 10:10:15 +0200 Message-ID: Subject: Re: How to implement custom stream operator over a window? And after the Count-Min Sketch? To: Rong Rong Cc: user Content-Type: multipart/alternative; boundary="00000000000034ae3005876a792d" --00000000000034ae3005876a792d Content-Type: text/plain; charset="UTF-8" Hi Rong, thanks for your insights. I agree with the three points that you said. My plan is to implement an operator that compute the Count-min sketch and developers can assign functions to increase the estimative of the sketch (adding more/different functions the sketch will be more precise, hence more heavy). But the operator will also hold default hash functions so the developer does not have to add any function with he does not want. Like I said, I will implement on my project. But I totally agree to keep the discussion on the original FLINK-2147 JIRA ticket. Doing so I can collect more opinions =) Thanks! Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com * On Fri, Apr 26, 2019 at 4:10 AM Rong Rong wrote: > Hi Felipe, > > I am not sure the algorithm requires to construct a new extension of the > window operator. I think your implementation of the CountMinSketch object > as an aggregator: > E.g. > 1. AggregateState (ACC) should be the aggregating accumulate > count-min-sketch 2-D hash array (plus a few other needed fields). > 2. accumulate method just simply do the update. > 3. getResult simply get the frequency from sketch. > > Thus you will not need to use a customized ValueStateDescriptor. > > But I agree that maybe it is a good idea to support a class of use cases > that requires approximate aggregate state (like HyperLogLog?), this > might've been a good value add in my opinion. > I think some further discussion is needed if we are going down that path. > Do you think the original FLINK-2147 > JIRA ticket is a good > place to carry out that discussion? We can probably continue there or > create a new JIRA for discussion. > > -- > Rong > > On Wed, Apr 24, 2019 at 1:32 AM Felipe Gutierrez < > felipe.o.gutierrez@gmail.com> wrote: > >> Hi Rong, >> >> thanks for your reply. I guess I already did something regarding what you >> have told to me. I have one example on this application [1], which uses >> this state [2] and computes a CountMinSketch [3]. >> >> I am seeking how to implement my own operator over a window in order to >> have more fine-grained control over it and learn with it. And hopefully, >> building a path to contribute to Flink in the future [4]. >> >> [1] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69 >> [2] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182 >> [3] >> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java >> [4] https://issues.apache.org/jira/browse/FLINK-2147 >> >> Best, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> * >> >> >> On Wed, Apr 24, 2019 at 2:06 AM Rong Rong wrote: >> >>> Hi Felipe, >>> >>> In a short glance, the question can depend on how your window is (is >>> there any overlap like sliding window) and how many data you would like to >>> process. >>> >>> In general, you can always buffer all the data into a ListState and >>> apply your window function by iterating through all those buffered elements >>> [1]. Provided that the data size is small enough to be hold efficiently in >>> the state-backend. >>> If this algorithm has some sort of pre-aggregation that can simplify the >>> buffering through an incremental, orderless aggregation, you can also think >>> about using [2]. >>> With these two approaches, you do not necessarily need to implement your >>> own window operator (extending window operator can be tricky), and you also >>> have access to the internal state [3]. >>> >>> Hope these helps your exploration. >>> >>> Thanks, >>> Rong >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction >>> >>> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez < >>> felipe.o.gutierrez@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I want to implement my own operator that computes the Count-Min Sketch >>>> over a window in Flink. Then, I found this Jira issue [1] >>>> which is exactly >>>> what I want. I believe that I have to work out my skills to arrive at a >>>> mature solution. >>>> >>>> So, the first thing that comes to my mind is to create my custom >>>> operator like the AggregateApplyWindowFunction [2] >>>> . >>>> Through this I can create the summary of my data over a window. >>>> >>>> Also, I found this custom JoinOperator example by Till Rohrmann [3] >>>> which I think I can base >>>> my implementation since it is done over a DataStream. >>>> >>>> What are your suggestions to me in order to start to implement a custom >>>> stream operator which computes Count-Min Sketch? Do you have any custom >>>> operator over window/keyBy that I can learn with the source code? >>>> >>>> ps.: I have implemented (looking at Blink source code) this a custom >>>> Combiner [4] >>>> >>>> (map-combiner-reduce) operator. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-2147 >>>> [2] >>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html >>>> [3] https://github.com/tillrohrmann/custom-join >>>> [4] >>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java >>>> >>>> Thanks, >>>> Felipe >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> * >>>> >>> --00000000000034ae3005876a792d Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Rong,

thanks for your insights. I ag= ree with the three points that you said. My plan is to implement an operato= r that compute the Count-min sketch and developers can assign functions to = increase the estimative of the sketch (adding more/different functions the = sketch will be more precise, hence more heavy). But the operator will also = hold default hash functions so the developer does not have to add any funct= ion with he does not want.

Like I said, I will imp= lement on my project. But I totally agree to keep the discussion on the ori= ginal=C2=A0FLINK-2147=C2=A0JIRA ticket. Doing so I can collect more= opinions =3D)

Thanks!
Felipe
=
<= div>--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez=


On Fri, Apr 26, = 2019 at 4:10 AM Rong Rong <walter= ddr@gmail.com> wrote:
Hi Felipe,

I am not sure t= he algorithm requires to construct a new extension of the window operator. = I think your implementation of the CountMinSketch object as an aggregator:<= /div>
E.g.
1. AggregateState (ACC) should be the aggregating = accumulate count-min-sketch 2-D hash array (plus a few other needed fields)= .
2. accumulate method just simply do the update.
3. ge= tResult simply get the frequency from sketch.

Thus= you will not need to use a customized ValueStateDescriptor.=C2=A0

But I agree that maybe it is a good idea to support a clas= s of use cases that requires approximate aggregate state (like HyperLogLog?= ), this might've been a good value add in my opinion.=C2=A0
I= think some further discussion is needed if we are going down that path. Do= you think the original=C2=A0FLINK-2147=C2=A0JIRA ticket is a good = place to carry out that discussion? We can probably continue there or creat= e a new JIRA for discussion.

--
Rong


=

= On Wed, Apr 24, 2019 at 2:06 AM Rong Rong <walterddr@gmail.com> wrote:
Hi Felipe,<= div>
In a short glance, the question can depend on how your w= indow is (is there any overlap like sliding window) and how many data you w= ould like to process.=C2=A0

In general, you can al= ways buffer all the data into a ListState and apply your window function by= iterating through all those buffered elements [1]. Provided that the data = size is small enough to be hold efficiently in the state-backend.
If this algorithm has some sort of pre-aggregation that can simplify the b= uffering through an incremental, orderless aggregation, you can also think = about using [2].
With these two approaches, you do not necessaril= y need to implement your own window operator (extending window operator can= be tricky), and you also have access to the internal state [3].
=
Hope these helps your exploration.

= Thanks,
Rong


On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <felipe.o.gutierrez@gmail.com= > wrote:
=
Hi,

I want = to implement my own operator that computes the Count-Min Sketch over a wind= ow in Flink. Then, I found this Jira issue [1] which is exactly wha= t I want. I believe that I have to work out my skills to arrive at a mature= solution.

So, the first thing that comes to my mi= nd is to create my custom operator like the Ag= gregateApplyWindowFunction [2]. Through this I can create the summary o= f my data over a window.

Also, I found this custom= JoinOperator example by Till Rohrmann [3] which I think I can base my i= mplementation since it is done over a DataStream.

= What are your suggestions to me in order to start to implement a custom str= eam operator which computes Count-Min Sketch? Do you have any custom operat= or over window/keyBy that I can learn with the source code?

<= /div>
ps.: I have implemented (looking at Blink source code) this a custom Combiner [4] (map-combiner-red= uce) operator.

[2] https:= //ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/= streaming/api/functions/windowing/AggregateApplyWindowFunction.html
[4] <= a href=3D"https://github.com/felipegutierrez/explore-flink/blob/master/src/= main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBun= dleOperator.java" target=3D"_blank">https://github.com/felipegutierrez/expl= ore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operato= r/AbstractRichMapStreamBundleOperator.java

Tha= nks,
Felipe
--
-- Felipe Gutierrez
-= - skype: felipe.o.gutierrez
--00000000000034ae3005876a792d--