Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A92F518E08 for ; Tue, 7 Jul 2015 14:15:01 +0000 (UTC) Received: (qmail 28324 invoked by uid 500); 7 Jul 2015 14:15:01 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 28264 invoked by uid 500); 7 Jul 2015 14:15:01 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 28252 invoked by uid 99); 7 Jul 2015 14:15:01 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jul 2015 14:15:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CCF51C09C6 for ; Tue, 7 Jul 2015 14:15:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id ZB8VDB-cE_y1 for ; Tue, 7 Jul 2015 14:14:59 +0000 (UTC) Received: from mail-la0-f49.google.com (mail-la0-f49.google.com [209.85.215.49]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 43E2E42AD0 for ; Tue, 7 Jul 2015 14:14:59 +0000 (UTC) Received: by laar3 with SMTP id r3so197633441laa.0 for ; Tue, 07 Jul 2015 07:14:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=yBIOKN2Et0kF/S7eV0f8ync9/ikk/tfUTtIIPTN5cXA=; b=g4B5FrLcgcsgfbGdJ4xqryCH7RoxWD+nyfFo5lSsWBCUGpr718HLgM1JoQvlpWi+BN V28vlS0A/NMUYSUD2h9nG4ajbx6DlbkIcjNRQVlbKA+vpBvY2d8fmk1epjH7lsTmeOzf SALYJv7l1JQFGLIL9usHuhGiw6GXtNWN3AlWcKNxn9XbUJSQKN/P4D0dJhwHRuRIXEgk Ot4PTFuFXcXYze+QpNWVL+oHEkIpXtcX5qBVBxOfP6oQ67dkJhmeSioDoqKfcy2KiLG8 TgPoP3KD840odQgTCUsXA6VP93X9dlh9HRUjAmE6pOgnUetJRn6R1yS0qzktO280aN73 YECQ== X-Received: by 10.112.217.2 with SMTP id ou2mr4250034lbc.15.1436278498153; Tue, 07 Jul 2015 07:14:58 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Gyula_F=C3=B3ra?= Date: Tue, 07 Jul 2015 14:14:48 +0000 Message-ID: Subject: Re: Rework of streaming iteration API To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a11348408ae5cb9051a49a518 --001a11348408ae5cb9051a49a518 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Sorry Stephan I meant it slightly differently, I see your point: DataStream source =3D ... SingleInputOperator mapper =3D source.map(...) mapper.addInput() So the add input would be a method of the operator not the stream. Aljoscha Krettek ezt =C3=ADrta (id=C5=91pont: 2015. j= =C3=BAl. 7., K, 16:12): > I think this would be good yes. I was just about to open an Issue for > changing the Streaming Iteration API. :D > > Then we should also make the implementation very straightforward and > simple, right now, the implementation of the iterations is all over the > place. > > On Tue, 7 Jul 2015 at 15:57 Gyula F=C3=B3ra wrote: > > > Hey, > > > > Along with the suggested changes to the streaming API structure I think > we > > should also rework the "iteration" api. Currently the iteration api tri= es > > to mimic the syntax of the batch API while the runtime behaviour is qui= te > > different. > > > > What we create instead of iterations is really just cyclic streams (loo= ps > > in the streaming job), so the API should somehow be intuitive about thi= s > > behaviour. > > > > I suggest to remove the explicit iterate call and instead add a method = to > > the StreamOperators that allows to connect feedback inputs (create > loops). > > It would look like this: > > > > A mapper that does nothing but iterates over some filtered input: > > > > *Current API :* > > DataStream source =3D .. > > IterativeDataStream it =3D source.iterate() > > DataStream mapper =3D it.map(noOpMapper) > > DataStream feedback =3D mapper.filter(...) > > it.closeWith(feedback) > > > > *Suggested API :* > > DataStream source =3D .. > > DataStream mapper =3D source.map(noOpMapper) > > DataStream feedback =3D mapper.filter(...) > > mapper.addInput(feedback) > > > > The suggested approach would let us define inputs to operators after th= ey > > are created and implicitly union them with the normal input. This is I > > think a much clearer approach than what we have now. > > > > What do you think? > > > > Gyula > > > --001a11348408ae5cb9051a49a518--