Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C20EC17552 for ; Tue, 21 Apr 2015 20:54:39 +0000 (UTC) Received: (qmail 15588 invoked by uid 500); 21 Apr 2015 20:54:39 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 15533 invoked by uid 500); 21 Apr 2015 20:54:39 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 15514 invoked by uid 99); 21 Apr 2015 20:54:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 20:54:39 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: message received from 54.191.145.13 which is an MX secondary for dev@flink.apache.org) Received: from [54.191.145.13] (HELO mx1-us-west.apache.org) (54.191.145.13) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Apr 2015 20:54:34 +0000 Received: from mailout1.informatik.hu-berlin.de (mailout1.informatik.hu-berlin.de [141.20.20.101]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 5CD2F256C1 for ; Tue, 21 Apr 2015 20:54:13 +0000 (UTC) Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t3LKowDp019271 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Tue, 21 Apr 2015 22:50:59 +0200 (MEST) Received: from [192.168.0.101] (cable-86-56-58-240.cust.telecolumbus.net [86.56.58.240]) (authenticated bits=0) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-AUTH-26-465-587) with ESMTP id t3LKovq2019266 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES128-SHA bits=128 verify=NO); Tue, 21 Apr 2015 22:50:57 +0200 (MEST) Message-ID: <5536B831.8000206@informatik.hu-berlin.de> Date: Tue, 21 Apr 2015 22:50:57 +0200 From: Bruno Cadonna User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Icedove/31.6.0 MIME-Version: 1.0 To: dev@flink.apache.org Subject: Re: Periodic full stream aggregations References: <5536069E.4060207@informatik.hu-berlin.de> <55364818.5080205@informatik.hu-berlin.de> In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Tue, 21 Apr 2015 22:50:59 +0200 (MEST) X-Virus-Checked: Checked by ClamAV on apache.org -----BEGIN PGP SIGNED MESSAGE----- Hash: SHA1 Hi Gyula, fair enough! I used a bad example. What I really wanted to know is whether your code supports only aggregation like sum, min, and max where you need to pass only a value to the next aggregation or also more complex data structures, e.g., a synopsis of the full stream, to compute an aggregation such as an approximate count distinct (item count)? Cheers, Bruno On 21.04.2015 15:18, Gyula Fóra wrote: > You are right, but you should never try to compute full stream > median, thats the point :D > > On Tue, Apr 21, 2015 at 2:52 PM, Bruno Cadonna < > cadonna@informatik.hu-berlin.de> wrote: > > Hi Gyula, > > I read your comments of your PR. > > I have a question to this comment: > > "It only allows aggregations so we dont need to keep the full > history in a buffer." > > What if the user implements an aggregation function like a median? > > For a median you need the full history, don't you? > > Am I missing something? > > Cheers, Bruno > > On 21.04.2015 14:31, Gyula Fóra wrote: >>>> I have opened a PR for this feature: >>>> >>>> https://github.com/apache/flink/pull/614 >>>> >>>> Cheers, Gyula >>>> >>>> On Tue, Apr 21, 2015 at 1:10 PM, Gyula Fóra >>>> wrote: >>>> >>>>> Thats a good idea, I will modify my PR to that :) >>>>> >>>>> Gyula >>>>> >>>>> On Tue, Apr 21, 2015 at 12:09 PM, Fabian Hueske >>>>> wrote: >>>>> >>>>>> Is it possible to switch the order of the statements, >>>>>> i.e., >>>>>> >>>>>> dataStream.every(Time.of(4,sec)).reduce(...) instead of >>>>>> dataStream.reduce(...).every(Time.of(4,sec)) >>>>>> >>>>>> I think that would be more consistent with the structure >>>>>> of the remaining API. >>>>>> >>>>>> Cheers, Fabian >>>>>> >>>>>> 2015-04-21 10:57 GMT+02:00 Gyula Fóra >>>>>> : >>>>>> >>>>>>> Hi Bruno, >>>>>>> >>>>>>> Of course you can do that as well. (That's the good >>>>>>> part :p ) >>>>>>> >>>>>>> I will open a PR soon with the proposed changes (first >>>>>>> without breaking >>>>>> the >>>>>>> current Api) and I will post it here. >>>>>>> >>>>>>> Cheers, Gyula >>>>>>> >>>>>>> On Tuesday, April 21, 2015, Bruno Cadonna < >>>>>> cadonna@informatik.hu-berlin.de >>>>>>>> >>>>>>> wrote: >>>>>>> >>>> Hi Gyula, >>>> >>>> I have a question regarding your suggestion. >>>> >>>> Can the current continuous aggregation be also specified with >>>> your proposed periodic aggregation? >>>> >>>> I am thinking about something like >>>> >>>> dataStream.reduce(...).every(Count.of(1)) >>>> >>>> Cheers, Bruno >>>> >>>> On 20.04.2015 22:32, Gyula Fóra wrote: >>>>>>>>>> Hey all, >>>>>>>>>> >>>>>>>>>> I think we are missing a quite useful feature >>>>>>>>>> that could be implemented (with some slight >>>>>>>>>> modifications) on top of the current windowing >>>>>>>>>> api. >>>>>>>>>> >>>>>>>>>> We currently provide 2 ways of aggregating (or >>>>>>>>>> reducing) over streams: doing a continuous >>>>>>>>>> aggregation and always output the aggregated >>>>>>>>>> value (which cannot be done properly in parallel) >>>>>>>>>> or doing aggregation in a window periodically. >>>>>>>>>> >>>>>>>>>> What we don't have at the moment is periodic >>>>>>>>>> aggregations on the whole stream. I would even go >>>>>>>>>> as far as to remove the continuous outputting >>>>>>>>>> reduce/aggregate it and replace it with this >>>>>>>>>> version as this in return can be done properly in >>>>>>>>>> parallel. >>>>>>>>>> >>>>>>>>>> My suggestion would be that a call: >>>>>>>>>> >>>>>>>>>> dataStream.reduce(..) dataStream.sum(..) >>>>>>>>>> >>>>>>>>>> would return a windowed data stream where the >>>>>>>>>> window is the whole record history, and the user >>>>>>>>>> would need to define a trigger to get the actual >>>>>>>>>> reduced values like: >>>>>>>>>> >>>>>>>>>> dataStream.reduce(...).every(Time.of(4,sec)) to >>>>>>>>>> get the actual reduced results. >>>>>>>>>> dataStream.sum(...).every(...) >>>>>>>>>> >>>>>>>>>> I think the current data stream >>>>>>>>>> reduce/aggregation is very confusing without >>>>>>>>>> being practical for any normal use-case. >>>>>>>>>> >>>>>>>>>> Also this would be a very api breaking change >>>>>>>>>> (but I would still make this change as it is much >>>>>>>>>> more intuitive than the current behaviour) so I >>>>>>>>>> would try to push it before the release if we can >>>>>>>>>> agree. >>>>>>>>>> >>>>>>>>>> Cheers, Gyula >>>>>>>>>> >>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> > >> > - -- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Dr. Bruno Cadonna Postdoctoral Researcher Databases and Information Systems Department of Computer Science Humboldt-Universität zu Berlin http://www.informatik.hu-berlin.de/~cadonnab ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -----BEGIN PGP SIGNATURE----- Version: GnuPG v1 iQEcBAEBAgAGBQJVNrgwAAoJEKdCIJx7flKwBbUIALXaXY3WuQw5ZG/TPrUZLl7d jLI0syhM62rv8larlpC6xGLxIHDDLfABSD/F+amXE6afmYqM4cb2R9tsjWuRzKt8 IWJoqT17EetTw82brOfy+kLCdm+URbPa1IzbuGeg02/zx/DmWXavnBilwSr679mC kbaGPgQ/6mVN6p4GL873CXhep4R89YQVmIG+9pQaesvh//lqTkV/8eXjP2jKN4Oq gYnWIwScJ9QfsyRj3jRs7lVLXeIq5ID94UkLryZnn5dEIRnoxfq6bHR0pVUbQJgp jwZRtT5CX83U3KUvstZ0z6M6ButbCWq8ol2Gf6ZOVpZfzj68Fz1PtbZyJTFhpDU= =bhGt -----END PGP SIGNATURE-----