Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABCF317493 for ; Thu, 9 Apr 2015 11:37:10 +0000 (UTC) Received: (qmail 49037 invoked by uid 500); 9 Apr 2015 11:36:12 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 48978 invoked by uid 500); 9 Apr 2015 11:36:12 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 48966 invoked by uid 99); 9 Apr 2015 11:36:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 11:36:11 +0000 X-ASF-Spam-Status: No, hits=-2.3 required=5.0 tests=RCVD_IN_DNSWL_MED,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of mjsax@informatik.hu-berlin.de designates 141.20.20.101 as permitted sender) Received: from [141.20.20.101] (HELO mailout1.informatik.hu-berlin.de) (141.20.20.101) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 11:36:07 +0000 Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t39BWWei016454 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Thu, 9 Apr 2015 13:32:33 +0200 (MEST) Received: from [141.20.27.42] (localhost [127.0.0.1]) (authenticated bits=0) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-AUTH-26-465-587) with ESMTP id t39BWW50016449 for ; Thu, 9 Apr 2015 13:32:32 +0200 (MEST) Message-ID: <552662B9.8050908@informatik.hu-berlin.de> Date: Thu, 09 Apr 2015 13:30:01 +0200 From: "Matthias J. Sax" User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Icedove/31.5.0 MIME-Version: 1.0 To: dev@flink.apache.org Subject: Re: Rework of the window-join semantics References: <5523A868.4050505@informatik.hu-berlin.de> <8540F857-EFB8-4F78-8D83-2336AF4A419F@kth.se> In-Reply-To: <8540F857-EFB8-4F78-8D83-2336AF4A419F@kth.se> Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="nuS0cwlgqk0oE3FDfB4ver8ktVtGNLVRI" X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Thu, 09 Apr 2015 13:32:33 +0200 (MEST) X-Virus-Checked: Checked by ClamAV on apache.org --nuS0cwlgqk0oE3FDfB4ver8ktVtGNLVRI Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Hi Paris, thanks for the pointer to the Naiad paper. That is quite interesting. The paper I mentioned [1], does not describe the semantics in detail; it is more about the implementation for the stream-joins. However, it uses the same semantics (from my understanding) as proposed by Gyula. -Matthias [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded Streams". VLDB 2002. On 04/07/2015 12:38 PM, Paris Carbone wrote: > Hello Matthias, >=20 > Sure, ordering guarantees are indeed a tricky thing, I recall having th= at discussion back in TU Berlin. Bear in mind thought that DataStream, ou= r abstract data type, represents a *partitioned* unbounded sequence of ev= ents. There are no *global* ordering guarantees made whatsoever in that m= odel across partitions. If you see it more generally there are many =E2=80= =9Crace conditions=E2=80=9D in a distributed execution graph of vertices = that process multiple inputs asynchronously, especially when you add join= s and iterations into the mix (how do you deal with reprocessing =E2=80=9C= old=E2=80=9D tuples that iterate in the graph). Btw have you checked the = Naiad paper [1]? Stephan cited a while ago and it is quite relevant to th= at discussion. >=20 > Also, can you cite the paper with the joining semantics you are referri= ng to? That would be of good help I think. >=20 > Paris >=20 > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >=20 > >=20 > > On 07 Apr 2015, at 11:50, Matthias J. Sax > wrote: >=20 > Hi @all, >=20 > please keep me in the loop for this work. I am highly interested and I > want to help on it. >=20 > My initial thoughts are as follows: >=20 > 1) Currently, system timestamps are used and the suggested approach can= > be seen as state-of-the-art (there is actually a research paper using > the exact same join semantic). Of course, the current approach is > inherently non-deterministic. The advantage is, that there is no > overhead in keeping track of the order of records and the latency shoul= d > be very low. (Additionally, state-recovery is simplified. Because, the > processing in inherently non-deterministic, recovery can be done with > relaxed guarantees). >=20 > 2) The user should be able to "switch on" deterministic processing, > ie, records are timestamped (either externally when generated, or > timestamped at the sources). Because deterministic processing adds some= > overhead, the user should decide for it actively. > In this case, the order must be preserved in each re-distribution step > (merging is sufficient, if order is preserved within each incoming > channel). Furthermore, deterministic processing can be achieved by soun= d > window semantics (and there is a bunch of them). Even for > single-stream-windows it's a tricky problem; for join-windows it's even= > harder. From my point of view, it is less important which semantics are= > chosen; however, the user must be aware how it works. The most tricky > part for deterministic processing, is to deal with duplicate timestamps= > (which cannot be avoided). The timestamping for (intermediate) result > tuples, is also an important question to be answered. >=20 >=20 > -Matthias >=20 >=20 > On 04/07/2015 11:37 AM, Gyula F=C3=B3ra wrote: > Hey, >=20 > I agree with Kostas, if we define the exact semantics how this works, t= his > is not more ad-hoc than any other stateful operator with multiple input= s. > (And I don't think any other system support something similar) >=20 > We need to make some design choices that are similar to the issues we h= ad > for windowing. We need to chose how we want to evaluate the windowing > policies (global or local) because that affects what kind of policies c= an > be parallel, but I can work on these things. >=20 > I think this is an amazing feature, so I wouldn't necessarily rush the > implementation for 0.9 though. >=20 > And thanks for helping writing these down. >=20 > Gyula >=20 > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas > wrote: >=20 > Yes, we should write these semantics down. I volunteer to help. >=20 > I don't think that this is very ad-hoc. The semantics are basically the= > following. Assuming an arriving element from the left side: > (1) We find the right-side matches > (2) We insert the left-side arrival into the left window > (3) We recompute the left window > We need to see whether right window re-computation needs to be triggere= d as > well. I think that this way of joining streams is also what the symmetr= ic > hash join algorithms were meant to support. >=20 > Kostas >=20 >=20 > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen > wrote: >=20 > Is the approach of joining an element at a time from one input against = a > window on the other input not a bit arbitrary? >=20 > This just joins whatever currently happens to be the window by the time= > the > single element arrives - that is a bit non-predictable, right? >=20 > As a more general point: The whole semantics of windowing and when they= > are > triggered are a bit ad-hoc now. It would be really good to start > formalizing that a bit and > put it down somewhere. Users need to be able to clearly understand and > how > to predict the output. >=20 >=20 >=20 > On Fri, Apr 3, 2015 at 12:10 PM, Gyula F=C3=B3ra > > wrote: >=20 > I think it should be possible to make this compatible with the > .window().every() calls. Maybe if there is some trigger set in "every" > we > would not join that stream 1 by 1 but every so many elements. The > problem > here is that the window and every in this case are very-very different > than > the normal windowing semantics. The window would define the join window= > for > each element of the other stream while every would define how often I > join > This stream with the other one. >=20 > We need to think to make this intuitive. >=20 > On Fri, Apr 3, 2015 at 11:23 AM, M=C3=A1rton Balassi < > balassi.marton@gmail.com> > wrote: >=20 > That would be really neat, the problem I see there, that we do not > distinguish between dataStream.window() and > dataStream.window().every() > currently, they both return WindowedDataStreams and TriggerPolicies > of > the > every call do not make much sense in this setting (in fact > practically > the > trigger is always set to count of one). >=20 > But of course we could make it in a way, that we check that the > eviction > should be either null or count of 1, in every other case we throw an > exception while building the JobGraph. >=20 > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < > aljoscha@apache.org> > wrote: >=20 > Or you could define it like this: >=20 > stream_A =3D a.window(...) > stream_B =3D b.window(...) >=20 > stream_A.join(stream_B).where().equals().with() >=20 > So a join would just be a join of two WindowedDataStreamS. This > would > neatly move the windowing stuff into one place. >=20 > On Thu, Apr 2, 2015 at 9:54 PM, M=C3=A1rton Balassi < > balassi.marton@gmail.com >=20 > wrote: > Big +1 for the proposal for Peter and Gyula. I'm really for > bringing > the > windowing and window join API in sync. >=20 > On Thu, Apr 2, 2015 at 6:32 PM, Gyula F=C3=B3ra > > wrote: >=20 > Hey guys, >=20 > As Aljoscha has highlighted earlier the current window join > semantics > in > the streaming api doesn't follow the changes in the windowing > api. > More > precisely, we currently only support joins over time windows of > equal > size > on both streams. The reason for this is that we now take a > window > of > each > of the two streams and do joins over these pairs. This would be > a > blocking > operation if the windows are not closed at exactly the same time > (and > since > we dont want this we only allow time windows) >=20 > I talked with Peter who came up with the initial idea of an > alternative > approach for stream joins which works as follows: >=20 > Instead of pairing windows for joins, we do element against > window > joins. > What this means is that whenever we receive an element from one > of > the > streams, we join this element with the current window(this > window > is > constantly updated) of the other stream. This is non-blocking on > any > window > definitions as we dont have to wait for windows to be completed > and > we > can > use this with any of our predefined policies like Time.of(...), > Count.of(...), Delta.of(....). >=20 > Additionally this also allows some very flexible way of defining > window > joins. With this we could also define grouped windowing inside > if > a > join. > An example of this would be: Join all elements of Stream1 with > the > last > 5 > elements by a given windowkey of Stream2 on some join key. >=20 > This feature can be easily implemented over the current > operators, > so > I > already have a working prototype for the simple non-grouped > case. > My > only > concern is the API, the best thing I could come up with is > something > like > this: >=20 > stream_A.join(stream_B).onWindow(windowDefA, > windowDefB).by(windowKey1, > windowKey2).where(...).equalTo(...).with(...) >=20 > (the user can omit the "by" and "with" calls) >=20 > I think this new approach would be worthy of our "flexible > windowing" > in > contrast with the current approach. >=20 > Regards, > Gyula >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 >=20 --nuS0cwlgqk0oE3FDfB4ver8ktVtGNLVRI Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJVJmK9AAoJEBXkotPFErDWBDwP/jPHanwaDScgV6I3YHdpxtvn muUWDA3ey8b+h3tfIHhhiz6r7Ak1wcgLq6rVxF86YxfbfeET/h/+t5LovnCdmmGX 9uZxOnMN7jrgSDjr76AGfjg9nHPG3LcL7ysijbn1kybYjnwCKfZXMqGGawfNbDH6 ZwOsIR73MJY9qltFfW19fi3amwVW71jmXPyOFuHssoc8unL7JMnOB8AsviGwb4YI eBgAHmLJ/B3nD4YccF2IGSf46zu67HfzDv5uI+T6RQyJWQ6SFQ0Z0jJfoxyHlRy0 oFKybnSUwmB9xg0j19F1RonVm6Y4GzdhhFVOnJsqbEMir6zpfSWAISFKSasDs0PB 8Ef8AM8V3ZaraimVive9KNMWFdWdDWVK9iAtrUavHI2By7nIQ+r2sQKBY2XNwGml DrBOPw5vLfCLPDjDKZxCwKNCYfLIF8xXQbNybymDTT1sB9J1ypV1dwM+I7IxNGmQ WEYH3sE/qcrvgYSGnXHhIEyTN2vX+d3BVtVLC+Q7jHocbeigwH0l3iv2pNh9zGUR zjQeU1oqHyTpN1EClMzNGL5S5NWi76ZdYdplAyk6zVroC5pIlODP7iP2k5rYrOUl QjfRfzvLbuCgXOwGJCBW7QKZrrU4lAUpm9vT2FYxNLuZuWM4AAHbkDjNlxwNuVBA Mtxbq8mJA0tvQU/AQxMU =mwEo -----END PGP SIGNATURE----- --nuS0cwlgqk0oE3FDfB4ver8ktVtGNLVRI--