flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: How to force the parallelism on small streams?
Date Thu, 03 Sep 2015 13:30:28 GMT
Btw, it is working with a parallelism 1 source, because only a single
source partitions (round-robin or random) the data.
Several sources do not assign work to the same few mappers.

2015-09-03 15:22 GMT+02:00 Matthias J. Sax <mjsax@apache.org>:

> If it would be only 14 elements, you are obviously right. However, if I
> understood Arnaud correctly, the problem is, that there are more than 14
> elements:
>
> > Each of my 100 sources gives only a few lines (say 14 max)
>
> That would be about 140 lines in total.
>
> Using non-parallel source, he is able to distribute the elements to all
> 100 mappers. I assume that about 40 mappers receive 2 lines, and 60
> receive 1 line.
>
> @Arnaud: is this correct?
>
>
> -Matthias
>
> On 09/03/2015 03:04 PM, Aljoscha Krettek wrote:
> > Hi,
> > I don't think it's a bug. If there are 100 sources that each emit only
> > 14 elements then only the first 14 mappers will ever receive data. The
> > round-robin distribution is not global, since the sources operate
> > independently from each other.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mjsax@apache.org
> > <mailto:mjsax@apache.org>> wrote:
> >
> >     Thanks for clarifying. shuffle() is similar to rebalance() --
> however,
> >     it redistributes randomly and not in round robin fashion.
> >
> >     However, the problem you describe sounds like a bug to me. I included
> >     dev list. Maybe anyone else can step in so we can identify it there
> is a
> >     bug or not.
> >
> >     -Matthias
> >
> >
> >     On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> >     > Hi,
> >     >
> >     > You are right, but in fact it does not solve my problem, since I
> >     have 100 parallelism everywhere. Each of my 100 sources gives only a
> >     few lines (say 14 max), and only the first 14 next nodes will
> >     receive data.
> >     > Same problem by replacing rebalance() with shuffle().
> >     >
> >     > But I found a workaround: setting parallelism to 1 for the source
> >     (I don't need a 100 directory scanners anyway), it forces the
> >     rebalancing evenly between the mappers.
> >     >
> >     > Greetings,
> >     > Arnaud
> >     >
> >     >
> >     > -----Message d'origine-----
> >     > De : Matthias J. Sax [mailto:mjsax@apache.org
> >     <mailto:mjsax@apache.org>]
> >     > Envoyé : mercredi 2 septembre 2015 17:56
> >     > À : user@flink.apache.org <mailto:user@flink.apache.org>
> >     > Objet : Re: How to force the parallelism on small streams?
> >     >
> >     > Hi,
> >     >
> >     > If I understand you correctly, you want to have 100 mappers. Thus
> >     you need to apply the .setParallelism() after .map()
> >     >
> >     >>
> >
>  addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> >     >> 00)
> >     >
> >     > The order of commands you used, set the dop for the source to 100
> >     (which might be ignored, if the provided source function
> >     "myFileSource" does not implements "ParallelSourceFunction"
> >     interface). The dop for the mapper should be the default value.
> >     >
> >     > Using .rebalance() is absolutely correct. It distributes the
> >     emitted tuples in a round robin fashion to all consumer tasks.
> >     >
> >     > -Matthias
> >     >
> >     > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> >     >> Hi,
> >     >>
> >     >>
> >     >>
> >     >> I have a source that provides few items since it gives file names
> to
> >     >> the mappers. The mapper opens the file and process records. As the
> >     >> files are huge, one input line (a filename) gives a consequent
> >     work to the next stage.
> >     >>
> >     >> My topology looks like :
> >     >>
> >     >>
> >
>  addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> >     >> er)
> >     >>
> >     >> If 100 mappers are created, about 85 end immediately and only a
> few
> >     >> process the files (for hours). I suspect an optimization making
> that
> >     >> there is a minimum number of lines to pass to the next node or it
> is
> >     >> “shutdown” ; but in my case I do want the lines to be evenly
> >     >> distributed to each mapper.
> >     >>
> >     >> How to enforce that ?
> >     >>
> >     >>
> >     >>
> >     >> Greetings,
> >     >>
> >     >> Arnaud
> >     >>
> >     >>
> >     >>
> >
>  ----------------------------------------------------------------------
> >     >> --
> >     >>
> >     >> L'intégrité de ce message n'étant pas assurée sur internet, la
> >     société
> >     >> expéditrice ne peut être tenue responsable de son contenu ni de
> ses
> >     >> pièces jointes. Toute utilisation ou diffusion non autorisée est
> >     >> interdite. Si vous n'êtes pas destinataire de ce message, merci
> de le
> >     >> détruire et d'avertir l'expéditeur.
> >     >>
> >     >> The integrity of this message cannot be guaranteed on the
> Internet.
> >     >> The company that sent this message cannot therefore be held
> >     liable for
> >     >> its content nor attachments. Any unauthorized use or
> dissemination is
> >     >> prohibited. If you are not the intended recipient of this message,
> >     >> then please delete it and notify the sender.
> >     >
> >
>
>

Mime
View raw message