flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Iteration feedback partitioning does not work properly
Date Tue, 06 Oct 2015 08:39:46 GMT
Hi,

This is just a workaround, which actually breaks input order from my
source. I think the iteration construction should be reworked to set the
parallelism of the source/sink to the parallelism of the head operator (and
validate that all heads have the same parallelism).

I thought this was the solution that you described with Stephan in some
older discussion before the rewrite.

Cheers,
Gyula

Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. okt. 6., K,
9:15):

> Hi,
> I think what you would like to to can be achieved by:
>
> IterativeStream it = in.map(IdentityMap).setParallelism(2).iterate()
> DataStream mapped = it.map(...)
>  it.closeWith(mapped.partitionByHash(someField))
>
> The input is rebalanced to the map inside the iteration as in your example
> and the feedback should be partitioned by hash.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 6 Oct 2015 at 00:11 Gyula Fóra <gyula.fora@gmail.com> wrote:
>
> > Hey,
> >
> > This question is mainly targeted towards Aljoscha but maybe someone can
> > help me out here:
> >
> > I think the way feedback partitioning is handled does not work, let me
> > illustrate with a simple example:
> >
> > IterativeStream it = ... (parallelism 1)
> > DataStream mapped = it.map(...) (parallelism 2)
> > // this does not work as the feedback has parallelism 2 != 1
> > // it.closeWith(mapped.partitionByHash(someField))
> > // so we need rebalance the data
> >
> >
> it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
> >
> > This program will execute but the feedback will not be partitioned by
> hash
> > to the mapper instances:
> > The partitioning will be set from the noOpMap to the iteration sink which
> > has parallelism different from the mapper (1 vs 2) and then the iteration
> > source forwards the element to the mapper (always to 0).
> >
> > So the problem is basically that the iteration source/sink pair gets the
> > parallelism of the input stream (p=1) not the head operator (p = 2) which
> > leads to incorrect partitioning.
> >
> > Did I miss something here?
> >
> > Cheers,
> > Gyula
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message