flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <mj...@apache.org>
Subject Re: How to force the parallelism on small streams?
Date Thu, 03 Sep 2015 13:22:52 GMT
If it would be only 14 elements, you are obviously right. However, if I
understood Arnaud correctly, the problem is, that there are more than 14
elements:

> Each of my 100 sources gives only a few lines (say 14 max)

That would be about 140 lines in total.

Using non-parallel source, he is able to distribute the elements to all
100 mappers. I assume that about 40 mappers receive 2 lines, and 60
receive 1 line.

@Arnaud: is this correct?


-Matthias

On 09/03/2015 03:04 PM, Aljoscha Krettek wrote:
> Hi,
> I don't think it's a bug. If there are 100 sources that each emit only
> 14 elements then only the first 14 mappers will ever receive data. The
> round-robin distribution is not global, since the sources operate
> independently from each other.
> 
> Cheers,
> Aljoscha
> 
> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mjsax@apache.org
> <mailto:mjsax@apache.org>> wrote:
> 
>     Thanks for clarifying. shuffle() is similar to rebalance() -- however,
>     it redistributes randomly and not in round robin fashion.
> 
>     However, the problem you describe sounds like a bug to me. I included
>     dev list. Maybe anyone else can step in so we can identify it there is a
>     bug or not.
> 
>     -Matthias
> 
> 
>     On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
>     > Hi,
>     >
>     > You are right, but in fact it does not solve my problem, since I
>     have 100 parallelism everywhere. Each of my 100 sources gives only a
>     few lines (say 14 max), and only the first 14 next nodes will
>     receive data.
>     > Same problem by replacing rebalance() with shuffle().
>     >
>     > But I found a workaround: setting parallelism to 1 for the source
>     (I don't need a 100 directory scanners anyway), it forces the
>     rebalancing evenly between the mappers.
>     >
>     > Greetings,
>     > Arnaud
>     >
>     >
>     > -----Message d'origine-----
>     > De : Matthias J. Sax [mailto:mjsax@apache.org
>     <mailto:mjsax@apache.org>]
>     > Envoyé : mercredi 2 septembre 2015 17:56
>     > À : user@flink.apache.org <mailto:user@flink.apache.org>
>     > Objet : Re: How to force the parallelism on small streams?
>     >
>     > Hi,
>     >
>     > If I understand you correctly, you want to have 100 mappers. Thus
>     you need to apply the .setParallelism() after .map()
>     >
>     >>
>     addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
>     >> 00)
>     >
>     > The order of commands you used, set the dop for the source to 100
>     (which might be ignored, if the provided source function
>     "myFileSource" does not implements "ParallelSourceFunction"
>     interface). The dop for the mapper should be the default value.
>     >
>     > Using .rebalance() is absolutely correct. It distributes the
>     emitted tuples in a round robin fashion to all consumer tasks.
>     >
>     > -Matthias
>     >
>     > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
>     >> Hi,
>     >>
>     >>
>     >>
>     >> I have a source that provides few items since it gives file names to
>     >> the mappers. The mapper opens the file and process records. As the
>     >> files are huge, one input line (a filename) gives a consequent
>     work to the next stage.
>     >>
>     >> My topology looks like :
>     >>
>     >>
>     addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
>     >> er)
>     >>
>     >> If 100 mappers are created, about 85 end immediately and only a few
>     >> process the files (for hours). I suspect an optimization making that
>     >> there is a minimum number of lines to pass to the next node or it is
>     >> “shutdown” ; but in my case I do want the lines to be evenly
>     >> distributed to each mapper.
>     >>
>     >> How to enforce that ?
>     >>
>     >>
>     >>
>     >> Greetings,
>     >>
>     >> Arnaud
>     >>
>     >>
>     >>
>     ----------------------------------------------------------------------
>     >> --
>     >>
>     >> L'intégrité de ce message n'étant pas assurée sur internet, la
>     société
>     >> expéditrice ne peut être tenue responsable de son contenu ni de ses
>     >> pièces jointes. Toute utilisation ou diffusion non autorisée est
>     >> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
>     >> détruire et d'avertir l'expéditeur.
>     >>
>     >> The integrity of this message cannot be guaranteed on the Internet.
>     >> The company that sent this message cannot therefore be held
>     liable for
>     >> its content nor attachments. Any unauthorized use or dissemination is
>     >> prohibited. If you are not the intended recipient of this message,
>     >> then please delete it and notify the sender.
>     >
> 


Mime
View raw message