Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8988D200B67 for ; Tue, 2 Aug 2016 03:47:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 882DE160AA7; Tue, 2 Aug 2016 01:47:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80AF7160A6C for ; Tue, 2 Aug 2016 03:47:12 +0200 (CEST) Received: (qmail 42151 invoked by uid 500); 2 Aug 2016 01:47:11 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 42141 invoked by uid 99); 2 Aug 2016 01:47:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Aug 2016 01:47:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 2A11B1A1315 for ; Tue, 2 Aug 2016 01:47:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.448 X-Spam-Level: * X-Spam-Status: No, score=1.448 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 4SrhT8WREGPM for ; Tue, 2 Aug 2016 01:47:07 +0000 (UTC) Received: from mail-qk0-f171.google.com (mail-qk0-f171.google.com [209.85.220.171]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id C9B975F613 for ; Tue, 2 Aug 2016 01:47:06 +0000 (UTC) Received: by mail-qk0-f171.google.com with SMTP id v123so29663992qkh.3 for ; Mon, 01 Aug 2016 18:47:06 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=RE1FtZlmpxZpn3Ae+yC2E4mqLa4P5+KuiiK2dNKkcl8=; b=hNJTh5ciPNiOrtQF+6fOrypNLBDi6i5y1Ef/0hhJug38GRy0x77jZfABSVrGMBbktW WsefTQlJOn/vf6u5b4vvZT0IiCpdnxPBl7nAlA8p5HXRc8ervaLAbd6pnSVwZbZ0Gkaw IYcaVqCC6K1MjyRyYmbK/Q+GqcA/vKwImrnzdPM3wZCJHoL32F2PXfZJvn04WASJwV2K Jm3YqkpFg7mXIwyomOQ6cCGOA8HRuvkKM7ZDFAy6phwVek2bGQ7cP2pvMLfTzvGu4+2J zVmis1cSLhm+zAueEUTO4ppKzAZoyNt260QyXTzmAB0EaI1U3SWcKQXSuLem5/Al9QCc +44g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=RE1FtZlmpxZpn3Ae+yC2E4mqLa4P5+KuiiK2dNKkcl8=; b=Ac5QzmLSC7yzy1WhV46V2HA2cr816mx/i5zdzij6cAxF2UT+z2hprJgbzS6KW7cNPv vzuyFToi3oVs5NnkxcGoTqAbBlZDPT1rk7I0U6J4NxcBskFvc1hLqYdtFZDnvnLaRl8I O6uPtIMGtAsVbUIWXhd/NECbKlX2uMVKLu2GhhRc0ej/TQHFORKlDHSsmX2Fk/I+cr37 RkPEXmf38YOhevdFG5w3ILBbYhmxB8W4KPqXwj8BU8Knchf/fPE0rOFw09wXr+GNKhkU 0mMk+em+QKjbSi5pyb0bhM6z6bnIjeMBTaJEmUUjJzDdjYoaQi2mpCF9AFCFS5tZo1bm euuw== X-Gm-Message-State: AEkoousja3+b1HHl7vCCzwiLC0w1HanRrzcJiutri5TI+mfrryO+1kJ9p4Hb1hg2Y0TDDTs4Gs/BvS3m6GLldQ== X-Received: by 10.55.122.65 with SMTP id v62mr69992045qkc.145.1470102426130; Mon, 01 Aug 2016 18:47:06 -0700 (PDT) MIME-Version: 1.0 Received: by 10.55.163.206 with HTTP; Mon, 1 Aug 2016 18:46:45 -0700 (PDT) In-Reply-To: References: From: Vishnu Viswanath Date: Mon, 1 Aug 2016 21:46:45 -0400 Message-ID: Subject: Re: how does flink assign windows to task To: user@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c05e0ece4a21b05390ce468 archived-at: Tue, 02 Aug 2016 01:47:13 -0000 --94eb2c05e0ece4a21b05390ce468 Content-Type: text/plain; charset=UTF-8 Thanks Sameer and Till, On Mon, Aug 1, 2016 at 9:31 AM, Till Rohrmann wrote: > Yes you're right Sameer. That's how things work in Flink. > > Cheers, > Till > > On Sun, Jul 31, 2016 at 12:33 PM, Sameer Wadkar > wrote: > >> Vishnu, >> >> I would imagine based on Max's explanation and how other systems like >> MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 >> keys would be assigned to each slot. Each slot would maintain one or more >> windows (more for time based windows) and each window would have upto 2 >> panes (depending on whether there are elements for a key for a given >> window). The trigger would evaluate which of these panes will fire for >> global window (count windows) or which window as a whole fires for a time >> window. >> >> It seems like this is the only way to get the most efficient utilization >> for the entire cluster and allow all keys to be evaluated simultaneously >> without being starved by keys getting more elements in case of a slew. >> >> So I think you will need to have enough memory to hold all the elements >> that can arrive for all the active windows (not triggered) for two keys in >> a task. For count windows this is easy to estimate. But for times windows >> it is less clear if you receive elements out of order. >> >> Let's see what Max replies. I am just reasoning based on how Flink should >> work based on how other similar systems do it. >> >> Sameer >> >> >> On Jul 29, 2016, at 9:51 PM, Vishnu Viswanath < >> vishnu.viswanath25@gmail.com> wrote: >> >> Hi Max, >> >> Thanks for the explanation. >> >> "This happens one after another in a single task slot but in parallel >> across all the task slots". >> Could you explain more on how this happens in parallel? Which part does >> occur in parallel? Is it the Trigger going through each pane and the window >> function being executed. >> As in the first example, if there are 100 Panes (since I have 1 window >> and 100 keys) will trigger go through these 100 Panes using 50 task slots >> and then execute whichever fires? Does that mean that Flink determines >> which are the set of Panes that has to be evaluated in each task slot and >> then the trigger goes through it? >> >> The reason I am trying to understand exactly how it works is because : I >> need to decide how much memory each node in my cluster should have. I know >> that a single pane would not cause OOM in my case(since the number of >> elements per pane is not huge), but nodes might not have enough memory to >> hold the entire window in memory (since I can have a large number of Panes). >> >> Thanks, >> Vishnu >> >> >> On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels >> wrote: >> >>> Hi Vishnu Viswanath, >>> >>> The keyed elements are spread across the 50 task slots (assuming you >>> have a parallelism of 50) using hash partitioning on the keys. Each >>> task slot runs one or multiple operators (depending on the slot >>> sharing options). One of them is a WindowOperator which will decide >>> when to trigger and process your keyed elements. >>> >>> The WindowOperator holds the WindowAssigner and the Trigger. The >>> WindowAssigner will determine which window an incoming element gets >>> assigned. Windows are kept for each key; the combination of window and >>> key is usually called Pane. The Trigger will go through all the Panes >>> and check if they should fire or not (whether the window function >>> should be executed). This happens one after another in a single task >>> slot but in parallel across all the task slots. >>> >>> Just a brief explanation. Hope it helps :) >>> >>> Cheers, >>> Max >>> >>> On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath >>> wrote: >>> > Hi, >>> > >>> > Lets say I have a window on a keyed stream, and I have about 100 unique >>> > keys. >>> > And assume I have about 50 tasks slots in my cluster. And suppose my >>> trigger >>> > fired 70/100 windows/pane at the same time. >>> > >>> > How will flink handle this? Will it assign 50/70 triggered windows to >>> the 50 >>> > available task slots and wait for 20 of them to finish before >>> assigning the >>> > remaining 20 to the slots? >>> > >>> > Thanks, >>> > Vishnu Viswanath >>> >> >> > --94eb2c05e0ece4a21b05390ce468 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Thanks Sameer and Till,


On Mon, Aug 1, 2016 = at 9:31 AM, Till Rohrmann <trohrmann@apache.org> wrote:
Yes you're right Same= er. That's how things work in Flink.

Cheers,
Till

On Sun, Jul 31, 2016 at 12:33 PM,= Sameer Wadkar <sameer@axiomine.com> wrote:
Vishnu,

I would imagine based on Max's explanation and how other systems like = MapReduce and Spark partition keys, if you have 100 keys and 50 slots, 2 ke= ys would be assigned to each slot. Each slot would maintain one or more win= dows (more for time based windows) and each window would have upto 2 panes = (depending on whether there are elements for a key for a given window). The= trigger would evaluate which of these panes will fire for global window (c= ount windows) or which window as a whole fires for a time window.=C2=A0

It seems like this is the only way to get the most ef= ficient utilization for the entire cluster and allow all keys to be evaluat= ed simultaneously without being starved by keys getting more elements in ca= se of a slew.=C2=A0

So I think you will need to ha= ve enough memory to hold all the elements that can arrive for all the activ= e windows (not triggered) for two keys in a task. For count windows this is= easy to estimate. But for times windows it is less clear if you receive el= ements out of order.=C2=A0

Let's see what Max = replies. I am just reasoning based on how Flink should work based on how ot= her similar systems do it.=C2=A0
Sameer


On Jul 29= , 2016, at 9:51 PM, Vishnu Viswanath <vishnu.viswanath25@gmail.com> wrote:=

Hi Max,<= /div>

Thanks for the explanation.

"This happ= ens one after another in a single task=C2=A0slot but in parallel across all the= task slots".=C2=A0
Could you explain more on how this happens in parallel? Which part d= oes occur in parallel? Is it the Trigger going through each pane and the wi= ndow function being executed.=C2=A0
As in the first example, if there are 100 Panes (since I have 1 = window and 100 keys) will trigger go through these 100 Panes=C2=A0using 50 = task slots and then execute whichever fires?=C2=A0 Does that mean that Flin= k=C2=A0determines which are=C2=A0the set of Panes that has to be evaluated = in each task slot and then the trigger goes through it?=C2=A0
<= div>
The reason I am trying to understand exactly how it works = is because : I need to decide how much memory each node in my cluster shoul= d have. I know that a single pane would not cause OOM in my case(since the = number of elements per pane is not huge), but nodes might not have enough m= emory to hold the entire window in memory (since I can have a=C2=A0large nu= mber of Panes).

Thanks,
Vishnu


On Fri, Jul 29, 2016 at 4:29 AM, Maximilian Michels <= mxm@apache.org><= /span> wrote:
Hi Vishnu Viswanath,

The keyed elements are spread across the 50 task slots (assuming you
have a parallelism of 50) using hash partitioning on the keys. Each
task slot runs one or multiple operators (depending on the slot
sharing options). One of them is a WindowOperator which will decide
when to trigger and process your keyed elements.

The WindowOperator holds the WindowAssigner and the Trigger. The
WindowAssigner will determine which window an incoming element gets
assigned. Windows are kept for each key; the combination of window and
key is usually called Pane. The Trigger will go through all the Panes
and check if they should fire or not (whether the window function
should be executed). This happens one after another in a single task
slot but in parallel across all the task slots.

Just a brief explanation. Hope it helps :)

Cheers,
Max

On Thu, Jul 28, 2016 at 5:49 PM, Vishnu Viswanath
<vishn= u.viswanath25@gmail.com> wrote:
> Hi,
>
> Lets say I have a window on a keyed stream, and I have about 100 uniqu= e
> keys.
> And assume I have about 50 tasks slots in my cluster. And suppose my t= rigger
> fired 70/100 windows/pane at the same time.
>
> How will flink handle this? Will it assign 50/70 triggered windows to = the 50
> available task slots and wait for 20 of them to finish before assignin= g the
> remaining 20 to the slots?
>
> Thanks,
> Vishnu Viswanath



--94eb2c05e0ece4a21b05390ce468--