Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7B7E4200C40 for ; Thu, 23 Mar 2017 12:31:53 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7A053160B84; Thu, 23 Mar 2017 11:31:53 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6A6F5160B75 for ; Thu, 23 Mar 2017 12:31:52 +0100 (CET) Received: (qmail 4693 invoked by uid 500); 23 Mar 2017 11:31:51 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 4678 invoked by uid 99); 23 Mar 2017 11:31:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Mar 2017 11:31:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 0A3E51889D6 for ; Thu, 23 Mar 2017 11:31:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.403 X-Spam-Level: X-Spam-Status: No, score=-0.403 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_MED=-2.3, RP_MATCHES_RCVD=-0.001, SPF_HELO_PASS=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=kth.se Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ErpKOnRXEb4U for ; Thu, 23 Mar 2017 11:31:47 +0000 (UTC) Received: from smtp-4.sys.kth.se (smtp-4.sys.kth.se [130.237.48.193]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 3CB0D5F3F5 for ; Thu, 23 Mar 2017 11:31:47 +0000 (UTC) Received: from smtp-4.sys.kth.se (localhost.localdomain [127.0.0.1]) by smtp-4.sys.kth.se (Postfix) with ESMTP id BD0FC1115 for ; Thu, 23 Mar 2017 12:31:37 +0100 (CET) X-Virus-Scanned: by amavisd-new at kth.se Received: from smtp-4.sys.kth.se ([127.0.0.1]) by smtp-4.sys.kth.se (smtp-4.sys.kth.se [127.0.0.1]) (amavisd-new, port 10024) with LMTP id 3MI1eLO8baaG for ; Thu, 23 Mar 2017 12:31:37 +0100 (CET) Received: from exdb02.ug.kth.se (unknown [192.168.32.112]) by smtp-4.sys.kth.se (Postfix) with ESMTPS id D4FDA1020 for ; Thu, 23 Mar 2017 12:31:36 +0100 (CET) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=kth.se; s=default; t=1490268696; bh=QcliOUfRkYq5LRGkSDvsyvDpOIWQ2XRoHjmvHCDZkHY=; h=From:To:Subject:Date:References:In-Reply-To; b=fJLGDfksv1aER/UL9m7xUDwvW8XevxqYxWYKz8fv+B2JlZ5hujV9x5M8F4D30OIVv g4Uo2gtoXoWWLT9HLR5861fwdVHejanUMfu12g0CFf/JsVXYXt2hqyW04vo51napKZ henOAovrdSRltkShE8S92EiD/FVIk43eKjyZ4CBo= Received: from exdb05.ug.kth.se (192.168.32.115) by exdb02.ug.kth.se (192.168.32.112) with Microsoft SMTP Server (TLS) id 15.0.1156.6; Thu, 23 Mar 2017 12:31:36 +0100 Received: from exdb01.ug.kth.se (192.168.32.111) by exdb05.ug.kth.se (192.168.32.115) with Microsoft SMTP Server (TLS) id 15.0.1156.6; Thu, 23 Mar 2017 12:31:35 +0100 Received: from exdb01.ug.kth.se ([192.168.32.111]) by exdb01.ug.kth.se ([192.168.32.111]) with mapi id 15.00.1156.000; Thu, 23 Mar 2017 12:31:35 +0100 From: Paris Carbone To: "user@flink.apache.org" Subject: Re: A question about iterations and prioritizing "control" over "data" inputs Thread-Topic: A question about iterations and prioritizing "control" over "data" inputs Thread-Index: AQHSo8iRWW1x8tpwdUSGi5NiEwNmlaGiObgA Date: Thu, 23 Mar 2017 11:31:34 +0000 Message-ID: References: In-Reply-To: Accept-Language: en-US, sv-SE Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ms-exchange-messagesentrepresentingtype: 1 x-ms-exchange-transport-fromentityheader: Hosted x-originating-ip: [193.10.66.77] Content-Type: multipart/alternative; boundary="_000_B64729A943DB49ACA138F5421631ADD7kthse_" MIME-Version: 1.0 archived-at: Thu, 23 Mar 2017 11:31:53 -0000 --_000_B64729A943DB49ACA138F5421631ADD7kthse_ Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: quoted-printable Unless I got this wrong, if he meant relaxing FIFO processing per channel/s= tream partition then Robert is absolutely right. On 23 Mar 2017, at 12:28, Paris Carbone > wrote: I think what Theo meant is to allow for different: high/low priority on dif= ferent channels (or data streams per se) for n-ary operators such as Connec= tedStream binary maps, loops etc.. not to change the sequence of events wit= hin channels I guess. This does not violate the FIFO channel assumptions of the checkpointing alg= orithm. The checkpoint barriers anyway block committed stream partitions so= there is no priority concern there. On 23 Mar 2017, at 12:13, Robert Metzger > wrote: To very quickly respond to Theo's question: No, it is not possible to have = records overtake each other in the buffer. This could potentially void the exactly once processing guarantees, in the = case when records overtake checkpoint barriers. On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp > wrote: Hi, I have a similar sounding use case and just yesterday was experimenting with this approach: Use 2 separate streams: one for model events, one for data events. Connect these 2, key the resulting stream and then use a RichCoFlatMapFunction to ensure that each data event is enriched with the latest model event as soon as a new model event arrives. Also, as soon as a new model arrives emit all previously seen events with this new model events. This involves keeping events and models in state. My emitted enriched events have a command-like syntax (add/remove) so that downstream operators can remove/add as necessary depending on the calculations (so for each model change I would emit an add/remove pair of enriched events). As I say I have only experimented with this yesterday, perhaps someone a bit more experienced with flink might spot some problems with this approach, which I would definitely be interested in hearing. Kat On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis > wro= te: > Hello all, > > I've started thinking about online learning in Flink and one of the issue= s > that has come > up in other frameworks is the ability to prioritize "control" over "data" > events in iterations. > > To set an example, say we develop an ML model, that ingests events in > parallel, performs > an aggregation to update the model, and then broadcasts the updated model= to > back through > an iteration/back edge. Using the above nomenclature the events being > ingested would be > "data" events, and the model update would a "control" event. > > I talked about this scenario a bit with couple of people (Paris and > Gianmarco) and one thing > we would like to have is the ability to prioritize the ingestion of contr= ol > events over the data events. > > If my understanding is correct, currently there is a buffer/queue of even= ts > waiting to be processed > for each operator, and each incoming event ends up at the end of that que= ue. > > If our data source is fast, and the model updates slow, a lot of data eve= nts > might be buffered/scheduled > to be processed before each model update, because of the speed difference > between the two > streams. But we would like to update the model that is used to process da= ta > events as soon as > the newest version becomes available. > > Is it somehow possible to make the control events "jump" the queue and be > processed as soon > as they arrive over the data events? > > Regards, > Theodore > > P.S. This is still very much a theoretical problem, I haven't looked at h= ow > such a pipeline would > be implemented in Flink. --_000_B64729A943DB49ACA138F5421631ADD7kthse_ Content-Type: text/html; charset="us-ascii" Content-ID: <7428F8503C7266488F1AE12334D8AB52@ug.kth.se> Content-Transfer-Encoding: quoted-printable Unless I got this wrong, if he meant relaxing FIFO processing per channel/s= tream partition then Robert is absolutely right.

On 23 Mar 2017, at 12:28, Paris Carbone <parisc@kth.se> wrote:

I think what Theo meant is to allow for different: high/low priority on dif= ferent channels (or data streams per se) for n-ary operators such as Connec= tedStream binary maps, loops etc.. not to change the sequence of events wit= hin channels I guess.

This does not violate the FIFO channel assumptions of the c= heckpointing algorithm. The checkpoint barriers anyway block committed stre= am partitions so there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger <rmetzger@apache.org> wrote:
To very quickly respond to Theo's question: No,= it is not possible to have records overtake each other in the buffer.
This could potentially void the exactly once processing gua= rantees, in the case when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp = <kathleen.sharp@signavio.com> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
<theodoros.= vasiloudis@gmail.com> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the is= sues
> that has come
> up in other frameworks is the ability to prioritize "control"= ; over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in<= br class=3D""> > parallel, performs
> an aggregation to update the model, and then broadcasts the updated mo= del to
> back through
> an iteration/back edge. Using the above nomenclature the events being<= br class=3D""> > ingested would be
> "data" events, and the model update would a "control&qu= ot; event.
>
> I talked about this scenario a bit with couple of people (Paris and > Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of co= ntrol
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of e= vents
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that = queue.
>
> If our data source is fast, and the model updates slow, a lot of data = events
> might be buffered/scheduled
> to be processed before each model update, because of the speed differe= nce
> between the two
> streams. But we would like to update the model that is used to process= data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the= queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked a= t how
> such a pipeline would
> be implemented in Flink.



--_000_B64729A943DB49ACA138F5421631ADD7kthse_--