Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69BAA200C32 for ; Thu, 9 Mar 2017 16:27:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 68540160B67; Thu, 9 Mar 2017 15:27:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8DF14160B64 for ; Thu, 9 Mar 2017 16:27:15 +0100 (CET) Received: (qmail 74444 invoked by uid 500); 9 Mar 2017 15:27:09 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 74433 invoked by uid 99); 9 Mar 2017 15:27:09 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Mar 2017 15:27:09 +0000 Received: from auth2-smtp.messagingengine.com (auth2-smtp.messagingengine.com [66.111.4.228]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 4FDC91A0193 for ; Thu, 9 Mar 2017 15:27:09 +0000 (UTC) Received: from compute1.internal (compute1.nyi.internal [10.202.2.41]) by mailauth.nyi.internal (Postfix) with ESMTP id 4CB67216AB for ; Thu, 9 Mar 2017 10:27:08 -0500 (EST) Received: from web2 ([10.202.2.212]) by compute1.internal (MEProxy); Thu, 09 Mar 2017 10:27:08 -0500 X-ME-Sender: Received: by mailuser.nyi.internal (Postfix, from userid 99) id 26D00626DB; Thu, 9 Mar 2017 10:27:08 -0500 (EST) Message-Id: <1489073228.4185348.905895288.7BCB9574@webmail.messagingengine.com> From: Aljoscha Krettek To: dev@flink.apache.org MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="utf-8" X-Mailer: MessagingEngine.com Webmail Interface - ajax-9f47d516 Subject: Re: [DISCUSS] FLIP-17 Side Inputs Date: Thu, 09 Mar 2017 16:27:08 +0100 References: <1488809900.61040.901922504.5647FE08@webmail.messagingengine.com> In-Reply-To: archived-at: Thu, 09 Mar 2017 15:27:16 -0000 Hi, these are all valuable suggestions and I think that we should implement them when the time is right. However, I would like to first get a minimal viable version of this feature into Flink and then expand on it. I think the last few tries of tackling this problem fizzled out because we got to deep into discussing special semantics and features. I think the most important thing to agree on right now is the basic API and the implementation plan. What do you think about that? Regarding your suggestions, I have in fact a branch [1] from May 2016 where I implemented a prototype implementation. This has an n-ary operator and inputs can be either bounded or unbounded and the implementation actually waits for all bounded inputs to finish before starting to process the unbounded inputs. In general, I think blocking on an input is only possible while you're waiting for a bounded input to finish. If all inputs are unbounded you cannot block because you might run into deadlocks (in the processing graph, due to back pressure) and also because blocking will also block elements that might have a lower timestamp and might fall into a different window which is already ready for processing. Best, Aljoscha [1] https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote: > Hi Aljoscha, thank you for the proposal, it is great to hear about the > progress in side input. >=20 > Following is my point of view: > 1. I think there may be an option to block the processing of the main > input > instead of buffer the data in state because in production, the through > put > of the main input is usually much larger, and buffering the data before > the > side input may slow down the preparing of side input since the i-o and > computing resources are always limited. > 2. another issue may need to be disscussed: how can we do checkpointing > with side input, because static side input may finish soon once started > which will stop the checkpointing. > 3. I agree with Gyula that user should be able to determines when a side > input is ready? Maybe we can do it one step further: whether users can > determine a operator with multiple inputs to process which input each > time > or not? It would be more flexible. >=20 >=20 > Best Regards! > Wenlong >=20 > On 7 March 2017 at 18:39, Ventura Del Monte > wrote: >=20 > > Hi Aljoscha, > > > > Thank you for the proposal and for bringing up again this discussion. > > > > Regarding the implementation aspect,I would say the first way could > > be easier/faster to implement but it could add some overhead when > > dealing with multiple side inputs through the current 2-streams union > > transform. I tried the second option myself as it has less overhead > > but then the outcome was something close to a N-ary operator consuming > > first each side input while buffering the main one. > > Therefore, I would choose the third option as it is more generic > > and might help also in other scenarios, although its implementation > > requires more effort. > > I also agree with Gyula, I think the user should be allowed to define t= he > > condition that determines when a side input is ready, e.g., load the si= de > > input first, incrementally update the side input. > > > > Best, > > Ventura > > > > > > > > > > > > > > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain > > confidential and/or privileged information. If you are not the addresse= e or > > authorized to receive this for the addressee, you must not use, copy, > > disclose or take any action based on this message or any information > > herein. If you have received this message in error, please advise the > > sender immediately by reply e-mail and delete this message. Thank you f= or > > your cooperation. > > > > On Mon, Mar 6, 2017 at 3:50 PM, Gyula F=C3=B3ra = wrote: > > > > > Hi Aljoscha, > > > > > > Thank you for the nice proposal! > > > > > > I think it would make sense to allow user's to affect the readiness of > > the > > > side input. I think making it ready when the first element arrives is > > only > > > slightly better then making it always ready from usability perspectiv= e. > > For > > > instance if I am joining against a static data set I want to wait for= the > > > whole set before making it ready. This could be exposed as a user def= ined > > > condition that could also recognize bounded inputs maybe. > > > > > > Maybe we could also add an aggregating (merging) side input type, that > > > could work as a broadcast state. > > > > > > What do you think? > > > > > > Gyula > > > > > > Aljoscha Krettek ezt =C3=ADrta (id=C5=91pont: 2= 017. m=C3=A1rc. > > 6., > > > H, 15:18): > > > > > > > Hi Folks, > > > > > > > > I would like to finally agree on a plan for implementing side input= s in > > > > Flink. There has already been an attempt to come to consensus [1], > > which > > > > resulted in two design documents. I tried to consolidate those two = and > > > > also added a section about implementation plans. This is the result= ing > > > > FLIP: > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > 17+Side+Inputs+for+DataStream+API > > > > > > > > > > > > In terms of semantics I tried to go with the minimal viable solutio= n. > > > > The part that needs discussing is how we want to implement this. I > > > > outlined three possible implementation plans in the FLIP but what it > > > > boils down to is that we need to introduce some way of getting seve= ral > > > > inputs into an operator/task. > > > > > > > > > > > > Please have a look at the doc and let us know what you think. > > > > > > > > > > > > > > > > Best, > > > > > > > > Aljoscha > > > > > > > > > > > > > > > > [1] > > > > https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a > > > 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E > > > > > > > > >