Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 065D41885A for ; Mon, 8 Jun 2015 14:50:57 +0000 (UTC) Received: (qmail 90973 invoked by uid 500); 8 Jun 2015 14:50:43 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 90906 invoked by uid 500); 8 Jun 2015 14:50:43 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 90896 invoked by uid 99); 8 Jun 2015 14:50:43 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jun 2015 14:50:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 0C6151A47E8 for ; Mon, 8 Jun 2015 14:50:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.879 X-Spam-Level: ** X-Spam-Status: No, score=2.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 3_77hEYSsooS for ; Mon, 8 Jun 2015 14:50:40 +0000 (UTC) Received: from mail-la0-f49.google.com (mail-la0-f49.google.com [209.85.215.49]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id D6F22209DC for ; Mon, 8 Jun 2015 14:50:39 +0000 (UTC) Received: by labko7 with SMTP id ko7so98973875lab.2 for ; Mon, 08 Jun 2015 07:49:48 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=5j4ziwllFgibYaHK95S4vX/T88NC9cSTlK3YWdWQd+E=; b=BLxlSwDmYVpn5ZJqNhOhXRVAzIzVXQ/isr3qXrOWA7yTnzJVXxTZ2lliX77UHI+O79 Frdz8SOHGjcHsEsDA9ffTf3YcY8guMW8OM100f6bHuEA6RU2AyJ5OUJ2jgBYPCO9JOYY YvEp+8SC15vC5CS0hphwH0a1AJ9ucYstrf9waYm88TpFB9RlzclECdYbBWhZp+gYyhfs Xou3f9nBGGy7C3/V9aQMdGBUe0eEXDUBIu2F8gOjZVyLCcl/WjCd9Z2etfOFzRC1n4uT w7UNDy8XJ4EsueHiOBNp3YDh2elWJ9+KHTgNLeaJCfh3nh7MNLlAN8weEJVP88+AXoa0 xO0g== MIME-Version: 1.0 X-Received: by 10.112.147.201 with SMTP id tm9mr17273832lbb.40.1433774987757; Mon, 08 Jun 2015 07:49:47 -0700 (PDT) Received: by 10.112.21.138 with HTTP; Mon, 8 Jun 2015 07:49:47 -0700 (PDT) In-Reply-To: References: Date: Mon, 8 Jun 2015 16:49:47 +0200 Message-ID: Subject: Re: repartion locally to task manager From: Ventura Del Monte To: user@flink.apache.org Content-Type: multipart/alternative; boundary=047d7b34391ad5504d051802c052 --047d7b34391ad5504d051802c052 Content-Type: text/plain; charset=UTF-8 Hi Stephan, Many thank for your reply! 1) This would be a nice feature. I have already done something similar, if you told me which informations you would like to export in the runtime context, I could add them to my code, update unit tests and share them. 2) Yes, I have figured that out. However, I needed this kind of local repartition since I was working on a dataset sampler based on the filter operator (this is the first step of the iterative pipeline I am developing). To be honest, this repartition is just a plus because I have already achieved good results (even if a sampler like the one offered by spark when the ratio is low would be a good feature). The main drawback of this filter operation is that it takes in input always the same partition, so, if the partition is enough big, then the probability of sampling different items in consecutive filtering operations should be high (of course, using a good sampling factor and a correctly seeded rng). Yet if it was possible to shuffle the partitions on the same task manager, the following sampling operation would benefit, in my opinion, as the produced partition would contain different items with an even higher probability. Of course, I think this shuffle operation (being local to each tm) should not involve neither a network nor a disk transfer, otherwise, the game is not worth the candle. About the change of parallelism, I read that it triggers a sort of local re-distribution, but I do no think it is my case. Anyway, do you think this kind of shuffling/sampling can be achieved in flink? Does it make sense in your opinion? Best Regards, Ventura 2015-06-03 14:57 GMT+02:00 Stephan Ewen : > Hi Ventura! > > Sorry for the late response. Here are a few ideas or comments that may > help you: > > 1) We want to make it possible for a function (such as MapFunction) to > figure out on which TaskManager it is running. The mechanism would be > something like "getRuntimeContext().getTaskManagerInformation()". That > should help you determine which TaskManager you are. > > 2) When you are scheduling tasks, it is not guaranteed that slots 0, 1, 2, > ... are on the same TaskManager. The assignment is a based on locality of > the input data stream and the availability of slots. > > > Can you explain a bit more what the feature you want to add actually tries > to achieve? Then I may be able to give you more pointers. > > When you say that you need local re-distribution, does it imply something > like below, where a change of parallelism between operators implies that > the only locally repartition (not across the boundaries of TaskManagers)? > > > (map) (map) (map) (map) > \ / \ / > \ / \ / > (reduce) (reduce) > ^ ^ ^ ^ > | \ / | > | +------+ | > | / \ | > (source) (source) > > > > Greetings, > Stephan > > > > On Fri, May 22, 2015 at 10:58 AM, Ventura Del Monte < > venturadelmonte@gmail.com> wrote: > >> Hello, >> >> I am trying to introduce a new feature in my flink project, I would like >> to shuffle (random repartition) my dataset only locally to a task manager, >> so that each internal worker will have a different set of objects to work >> on. I have looked to internal flink mechanism, and I know (i hope) how it >> handles partitions. I think there are two ways to do it: >> >> a) using a mapPartiton, which for each input object X should output a >> tuple (X, destinationChannel), where the destinationChannel is the id of >> the new worker that will receive X. The main problem of this solution is to >> determine the correct destinationChannel in the mapPartition task. I think >> every operation in flink is unaware of the task manager on which it is >> executed, so I will need to read taskmanager config in order to get the >> number of slots available on the current TM, but then how should I relate >> this number to the total channels count, since I could have a situation >> like this: >> >> +----+----+----+----+----+----+----+----+----+---+---+---+---+----+ >> | | | | | | | | | | | | | | | >> | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10| 11| 12| 13 | >> +----+----+----+---------+----+----+----+----+--------------------+ >> | | | | >> | TM1 | TM2 | TM3 | >> +-------------------+----------------------------+----------------+ >> >> So even if I knew TM2 had 6 slots, i would not be able to know their id >> range -> [4,9] >> >> b) Destination channels are choosen in >> RegularPactTask.getOutputCollector, so some modifications of this method >> would make the local repartition possible using either a range or a >> custom partition, in order to make them taskmanager-aware. Yet this will >> involve some edits to flink runtime. >> >> Tbh, I would like to avoid the b. but I think I am at a dead end, and I >> will have to edit it. >> >> Do you have better suggestions? Thank you in advance. >> > > --047d7b34391ad5504d051802c052 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Stephan,

Many thank for your reply!= =C2=A0

1) This would be a nice feature. I have alr= eady done something similar, if you told me which informations you would li= ke to export in the runtime context, I could add them to my code, update un= it tests and share them.

2) Yes, I have figured th= at out. However, I needed this kind of local repartition since I was workin= g on a dataset sampler based on the filter operator (this is the first step= of the iterative pipeline I am developing). To be honest, this repartition= is just a plus because I have already achieved good results (even if a sam= pler like the one offered by spark when the ratio is low would be a good fe= ature). The main drawback of this filter operation is that it takes in inpu= t always the same partition, so, if the partition is enough big, then the p= robability of sampling different items in consecutive filtering operations = should be high (of course, using a good sampling factor and a correctly see= ded rng). Yet if it was possible to shuffle the partitions on the same task= manager, the following sampling operation would benefit, in my opinion, as= the produced partition would contain different items with an even higher p= robability. Of course, I think this shuffle operation (being local to each = tm) should not involve neither a network nor a disk transfer, otherwise, th= e game is not worth the candle.=C2=A0
About the change of paralle= lism, I read that it triggers a sort of local re-distribution, but I do no = think it is my case. Anyway, do you think this kind of shuffling/sampling c= an be achieved in flink? Does it make sense in your opinion?

=

Best Regards,
Ventura

2015-06-03 14:57 GMT+02= :00 Stephan Ewen <sewen@apache.org>:
Hi Ventura!

Sorry for the lat= e response. Here are a few ideas or comments that may help you:
<= br>
1) We want to make it possible for a function (such as MapFun= ction) to figure out on which TaskManager it is running. The mechanism woul= d be something like "getRuntimeContext().getTaskManagerInformation()&q= uot;. That should help you determine which TaskManager you are.
<= br>
2) When you are scheduling tasks, it is not guaranteed that s= lots 0, 1, 2, ... are on the same TaskManager. The assignment is a based on= locality of the input data stream and the availability of slots.


Can you explain a bit more what the feature = you want to add actually tries to achieve? Then I may be able to give you m= ore pointers.

When you say that you need local re-= distribution, does it imply something like below, where a change of paralle= lism between operators implies that the only locally repartition (not acros= s the boundaries of TaskManagers)?


= =C2=A0(map) (map) =C2=A0(map) (map)
<= /div>
=C2=A0 =C2=A0\ =C2=A0 =C2=A0 = / =C2=A0 =C2=A0 =C2=A0\ =C2=A0 =C2=A0/
=C2=A0 =C2=A0 \ =C2=A0 / =C2=A0 =C2=A0 =C2=A0 =C2=A0\ =C2= =A0/
=C2=A0 =C2=A0(red= uce) =C2=A0 =C2=A0(reduce)
=C2=A0 =C2=A0 =C2=A0 ^ ^ =C2=A0 =C2=A0 =C2=A0 =C2=A0^ ^
=C2=A0 =C2=A0 =C2=A0 | \ =C2=A0 =C2= =A0 =C2=A0 =C2=A0/ |
= =C2=A0 =C2=A0 =C2=A0 | =C2=A0+------+ =C2=A0|
=C2=A0 =C2=A0 =C2=A0 | / =C2=A0 =C2=A0 =C2=A0 =C2= =A0\ |
=C2=A0 =C2=A0(s= ource) =C2=A0=C2=A0= =C2=A0 (source)=C2=A0
=


Greetings,
St= ephan



On Fr= i, May 22, 2015 at 10:58 AM, Ventura Del Monte <venturadelmonte@gm= ail.com> wrote:
Hello,

I am trying to introduce a = new feature in my flink project, I would like to shuffle (random repartitio= n) my dataset only locally to a task manager, so that each internal worker = will have a different set of objects to work on. I have looked to internal = flink mechanism, and I know (i hope) how it handles partitions. I think the= re are two ways to do it:

a) using a= mapPartiton, which for each input object X should output a tuple (X, desti= nationChannel), where the destinationChannel is the id of the new worker th= at will receive X. The main problem of this solution is to determine the co= rrect destinationChannel in the mapPartition task. I think every operation = in flink is unaware of the task manager on which=C2=A0it=C2=A0is executed, so I will need to read taskmanager config in order to get= the number of slots available on the current TM, but then how should I rel= ate this number to the total channels count, since I could have a situation= like this:

+----+----+----+----+----+----+----+----+----+---+---+---+---+= ----+
| =C2=A0 =C2=A0|= =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0|= =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 =C2=A0| =C2=A0 | =C2=A0 | =C2=A0 | = =C2=A0 | =C2=A0 =C2=A0|
+----+----+----+---------+----+----+----+----+----------= ----------+
| =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 | =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0| =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0|
| =C2=A0 =C2=A0 =C2=A0TM1 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0| =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0TM2 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 | =C2=A0 =C2=A0 =C2=A0 TM3 =C2=A0= =C2=A0 =C2=A0|
+-----= --------------+----------------------------+----------------+
<= div>
So even if I knew TM2 had 6 slots, i would not be ab= le to know their id range -> [4,9]

b= ) Destination channels are choosen in RegularPactTask.getOutputCollector, s= o some modifications of this method would make the local repartition possib= le=C2=A0using=C2=A0either a range or a custom partition, = in order to make them taskmanager-aware. Yet this will involve some edits t= o flink runtime.=C2=A0

Tbh, I wo= uld like to avoid the b. but I think I am at a dead end, and I will have to= edit it.=C2=A0

Do you have bett= er suggestions? Thank you in advance.


--047d7b34391ad5504d051802c052--