Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 87CD318421 for ; Fri, 22 May 2015 08:59:01 +0000 (UTC) Received: (qmail 75957 invoked by uid 500); 22 May 2015 08:59:01 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 75886 invoked by uid 500); 22 May 2015 08:59:01 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 75876 invoked by uid 99); 22 May 2015 08:59:01 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 May 2015 08:59:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DDB2A1A30A8 for ; Fri, 22 May 2015 08:59:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Odfo2OCqNHKR for ; Fri, 22 May 2015 08:58:59 +0000 (UTC) Received: from mail-la0-f47.google.com (mail-la0-f47.google.com [209.85.215.47]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 46B12453EF for ; Fri, 22 May 2015 08:58:59 +0000 (UTC) Received: by labbd9 with SMTP id bd9so8127101lab.2 for ; Fri, 22 May 2015 01:58:58 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=UPZx+eh8EKyRd+zPQA/WS6Xc9SLNCqVJIgVoGi+y1rk=; b=gbf3UDGiaXPHXlDl8hi/DTQWBg90PmouoqSCjTsibovI7XRtWcVEAUFAzM/LOLD48i ROGj/jF4SK3e7CVLrjmscALOR9fDJGp7i22xe9vCyQbGTLzWd+rWV81uwHMQUN9IiFxP Ni+IAfGFidY6N8+1XsncJXipZpl+WaQHjtL4/Ly1E3A47N5UvFsxZYaUA82kJkqzZv9/ T1tWuuhr8wE6C67ZCI/ty0TNE2GVqi0jjQ+Dmf3QK57BiHfYY2SoVQBSg7SAe8LOn+Z1 GPsjmFuY7sOGtZF/e5sKVSUYlMS3NHX1g02avvzjGAMemQe8iyGetk6F+ysM6GW9PeAv LxAA== MIME-Version: 1.0 X-Received: by 10.112.161.66 with SMTP id xq2mr5655285lbb.31.1432285138096; Fri, 22 May 2015 01:58:58 -0700 (PDT) Received: by 10.112.21.138 with HTTP; Fri, 22 May 2015 01:58:58 -0700 (PDT) Date: Fri, 22 May 2015 10:58:58 +0200 Message-ID: Subject: repartion locally to task manager From: Ventura Del Monte To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c3447edf972d0516a7de44 --001a11c3447edf972d0516a7de44 Content-Type: text/plain; charset=UTF-8 Hello, I am trying to introduce a new feature in my flink project, I would like to shuffle (random repartition) my dataset only locally to a task manager, so that each internal worker will have a different set of objects to work on. I have looked to internal flink mechanism, and I know (i hope) how it handles partitions. I think there are two ways to do it: a) using a mapPartiton, which for each input object X should output a tuple (X, destinationChannel), where the destinationChannel is the id of the new worker that will receive X. The main problem of this solution is to determine the correct destinationChannel in the mapPartition task. I think every operation in flink is unaware of the task manager on which it is executed, so I will need to read taskmanager config in order to get the number of slots available on the current TM, but then how should I relate this number to the total channels count, since I could have a situation like this: +----+----+----+----+----+----+----+----+----+---+---+---+---+----+ | | | | | | | | | | | | | | | | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10| 11| 12| 13 | +----+----+----+---------+----+----+----+----+--------------------+ | | | | | TM1 | TM2 | TM3 | +-------------------+----------------------------+----------------+ So even if I knew TM2 had 6 slots, i would not be able to know their id range -> [4,9] b) Destination channels are choosen in RegularPactTask.getOutputCollector, so some modifications of this method would make the local repartition possible using either a range or a custom partition, in order to make them taskmanager-aware. Yet this will involve some edits to flink runtime. Tbh, I would like to avoid the b. but I think I am at a dead end, and I will have to edit it. Do you have better suggestions? Thank you in advance. --001a11c3447edf972d0516a7de44 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hello,
<= font face=3D"monospace, monospace">
I am trying to int= roduce a new feature in my flink project, I would like to shuffle (random r= epartition) my dataset only locally to a task manager, so that each interna= l worker will have a different set of objects to work on. I have looked to = internal flink mechanism, and I know (i hope) how it handles partitions. I = think there are two ways to do it:

a= ) using a mapPartiton, which for each input object X should output a tuple = (X, destinationChannel), where the destinationChannel is the id of the new = worker that will receive X. The main problem of this solution is to determi= ne the correct destinationChannel in the mapPartition task. I think every o= peration in flink is unaware of the task manager on which=C2=A0it= =C2=A0is executed, so I will need to read taskmanager config in or= der to get the number of slots available on the current TM, but then how sh= ould I relate this number to the total channels count, since I could have a= situation like this:

+----+----+----+----+----+----+----+----+----+---+--= -+---+---+----+
| =C2= =A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2= =A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 | =C2=A0 | = =C2=A0 | =C2=A0 | =C2=A0 =C2=A0|
| 0 =C2=A0| 1 =C2=A0| 2 =C2=A0| 3 =C2=A0| 4 =C2=A0| 5 =C2=A0| = =C2=A06 | =C2=A07 | =C2=A08 | 9 | 10| 11| 12| 13 |
+----+----+----+---------+----+----+----+----+= --------------------+
= | =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 | =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0| =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0|=
| =C2=A0 =C2=A0 =C2= =A0TM1 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0| =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0TM2 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 | =C2=A0 =C2=A0 =C2= =A0 TM3 =C2=A0 =C2=A0 =C2=A0|
+-------------------+----------------------------+----------------+=

So even if I knew TM2 had 6 slots, i w= ould not be able to know their id range -> [4,9]

b) Destination channels are choosen in RegularPactTask.getOutp= utCollector, so some modifications of this method would make the local repa= rtition possible=C2=A0using=C2=A0either a range or a cust= om partition, in order to make them taskmanager-aware. Yet this will involv= e some edits to flink runtime.=C2=A0

Tbh, I would like to avoid the b. but I think I am at a dead end, and = I will have to edit it.=C2=A0

Do= you have better suggestions? Thank you in advance.
--001a11c3447edf972d0516a7de44--