flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Fri, 31 Jul 2015 14:39:25 GMT
I still don't get how it could possibly work, let me tell you how I see and
correct me in my logic :)

You have this program:
ids.map1().setParallelism(2)
ids.map2().setParallelism(4)

//...

ids.closeWith(feedback.groupBy(0))

You are suggesting that we only have one iteration source/sink pair with
parallelism of either 2 or 4. I will assume that the parallelism is 2 for
the sake of the argument.

The iteration source is connected to map1 and map2 with Forward
partitioning and the sink is connected with groupBy(0).
Each sink instance will receive all tuples of a given key which also means
that each iteration source instance (2) will too.

Now here comes the problem: the source will forward the tuples to map 1 and
since we have forward connection we maintiain the groupby semantics (this
is perfect.)  the sources will also forward to map 2 which has higher
parallelism so the tuple sending turns into round robin, which screws up
the groupby.

What did I miss?
Gyula

Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 31.,
P, 14:59):

> Yes, this would still work. For example, I have this crazy graph:
> http://postimg.org/image/xtv8ay8hv/full/ That results from this program:
> https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
>
> It works, and the implementation is very simple, actually.
>
> On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.fora@gmail.com> wrote:
>
> > I mean that the head operators have different parallelism:
> >
> > IterativeDataStream ids = ...
> >
> > ids.map().setParallelism(2)
> > ids.map().setParallelism(4)
> >
> > //...
> >
> > ids.closeWith(feedback)
> >
> > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl.
> 31.,
> > P, 14:23):
> >
> > > I thought about having some tighter restrictions here. My idea was to
> > > enforce that the feedback edges must have the same parallelism as the
> > > original input stream, otherwise shipping strategies such as "keyBy",
> > > "shuffle", "rebalance" don't seem to make sense because they would
> differ
> > > from the distribution of the original elements (at least IMHO). Maybe
> I'm
> > > wrong there, though.
> > >
> > > To me it seems intuitive that I get the feedback at the head they way I
> > > specify it at the tail. But maybe that's also just me... :D
> > >
> > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyfora@apache.org> wrote:
> > >
> > > > Hey,
> > > >
> > > > I am not sure what is the intuitive behaviour here. As you are not
> > > applying
> > > > a transformation on the feedback stream but pass it to a closeWith
> > > method,
> > > > I thought it was somehow nature that it gets the partitioning of the
> > > > iteration input, but maybe its not intuitive.
> > > >
> > > > If others also think that preserving feedback partitioning should be
> > the
> > > > default I am not against it :)
> > > >
> > > > Btw, this still won't make it very simple. We still need as many
> > > > source/sink pairs as we have different parallelism among the head
> > > > operators. Otherwise the forwarding logic wont work.
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015.
júl.
> > > 31.,
> > > > P, 11:52):
> > > >
> > > > > Hi,
> > > > > I'm currently working on making the StreamGraph generation more
> > > > centralized
> > > > > (i.e. not spread across the different API classes). The question
is
> > now
> > > > why
> > > > > we need to switch to preserve partitioning? Could we not make
> > > "preserve"
> > > > > partitioning the default and if users want to have shuffle
> > partitioning
> > > > or
> > > > > anything they have to specify it manually when adding the feedback
> > > edge?
> > > > >
> > > > > This would make for a very simple scheme where the iteration
> sources
> > > are
> > > > > always connected to the heads using "forward" and the tails are
> > > connected
> > > > > to the iteration sinks using whatever partitioner was set by the
> > user.
> > > > This
> > > > > would make it more transparent than the current default of the
> > > "shuffle"
> > > > > betweens tails and iteration sinks.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > P.S. I now we had quite some discussion about introducing "preserve
> > > > > partitioning" but now, when I think of it it should be the
> default...
> > > :D
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message