flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gyula Fóra <gyula.f...@gmail.com>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Fri, 31 Jul 2015 15:15:49 GMT
There might be reasons why a user would want different parallelism at the
head operators (depending on what else that head operator might process) so
restricting them to the same parallelism is a little bit weird don't you
think? It kind of goes against the whole opeartors-parallelism idea.

I don't think its a huge complexity to group head operators together by
parallelism and add a source/sink per each group like we do now. What do
you say?

Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 31.,
P, 17:10):

> Yes, I'm not saying that it makes sense to do it, I'm just saying that it
> does translate and run. Your observation is true. :D
>
> I'm wondering whether it makes sense to allow users to have iteration heads
> with differing parallelism, in fact.
>
> On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.fora@gmail.com> wrote:
>
> > I still don't get how it could possibly work, let me tell you how I see
> and
> > correct me in my logic :)
> >
> > You have this program:
> > ids.map1().setParallelism(2)
> > ids.map2().setParallelism(4)
> >
> > //...
> >
> > ids.closeWith(feedback.groupBy(0))
> >
> > You are suggesting that we only have one iteration source/sink pair with
> > parallelism of either 2 or 4. I will assume that the parallelism is 2 for
> > the sake of the argument.
> >
> > The iteration source is connected to map1 and map2 with Forward
> > partitioning and the sink is connected with groupBy(0).
> > Each sink instance will receive all tuples of a given key which also
> means
> > that each iteration source instance (2) will too.
> >
> > Now here comes the problem: the source will forward the tuples to map 1
> and
> > since we have forward connection we maintiain the groupby semantics (this
> > is perfect.)  the sources will also forward to map 2 which has higher
> > parallelism so the tuple sending turns into round robin, which screws up
> > the groupby.
> >
> > What did I miss?
> > Gyula
> >
> > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl.
> 31.,
> > P, 14:59):
> >
> > > Yes, this would still work. For example, I have this crazy graph:
> > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> program:
> > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > >
> > > It works, and the implementation is very simple, actually.
> > >
> > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.fora@gmail.com> wrote:
> > >
> > > > I mean that the head operators have different parallelism:
> > > >
> > > > IterativeDataStream ids = ...
> > > >
> > > > ids.map().setParallelism(2)
> > > > ids.map().setParallelism(4)
> > > >
> > > > //...
> > > >
> > > > ids.closeWith(feedback)
> > > >
> > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015.
júl.
> > > 31.,
> > > > P, 14:23):
> > > >
> > > > > I thought about having some tighter restrictions here. My idea was
> to
> > > > > enforce that the feedback edges must have the same parallelism as
> the
> > > > > original input stream, otherwise shipping strategies such as
> "keyBy",
> > > > > "shuffle", "rebalance" don't seem to make sense because they would
> > > differ
> > > > > from the distribution of the original elements (at least IMHO).
> Maybe
> > > I'm
> > > > > wrong there, though.
> > > > >
> > > > > To me it seems intuitive that I get the feedback at the head they
> > way I
> > > > > specify it at the tail. But maybe that's also just me... :D
> > > > >
> > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyfora@apache.org>
wrote:
> > > > >
> > > > > > Hey,
> > > > > >
> > > > > > I am not sure what is the intuitive behaviour here. As you are
> not
> > > > > applying
> > > > > > a transformation on the feedback stream but pass it to a
> closeWith
> > > > > method,
> > > > > > I thought it was somehow nature that it gets the partitioning
of
> > the
> > > > > > iteration input, but maybe its not intuitive.
> > > > > >
> > > > > > If others also think that preserving feedback partitioning should
> > be
> > > > the
> > > > > > default I am not against it :)
> > > > > >
> > > > > > Btw, this still won't make it very simple. We still need as
many
> > > > > > source/sink pairs as we have different parallelism among the
head
> > > > > > operators. Otherwise the forwarding logic wont work.
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
2015.
> > júl.
> > > > > 31.,
> > > > > > P, 11:52):
> > > > > >
> > > > > > > Hi,
> > > > > > > I'm currently working on making the StreamGraph generation
more
> > > > > > centralized
> > > > > > > (i.e. not spread across the different API classes). The
> question
> > is
> > > > now
> > > > > > why
> > > > > > > we need to switch to preserve partitioning? Could we not
make
> > > > > "preserve"
> > > > > > > partitioning the default and if users want to have shuffle
> > > > partitioning
> > > > > > or
> > > > > > > anything they have to specify it manually when adding the
> > feedback
> > > > > edge?
> > > > > > >
> > > > > > > This would make for a very simple scheme where the iteration
> > > sources
> > > > > are
> > > > > > > always connected to the heads using "forward" and the tails
are
> > > > > connected
> > > > > > > to the iteration sinks using whatever partitioner was set
by
> the
> > > > user.
> > > > > > This
> > > > > > > would make it more transparent than the current default
of the
> > > > > "shuffle"
> > > > > > > betweens tails and iteration sinks.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > P.S. I now we had quite some discussion about introducing
> > "preserve
> > > > > > > partitioning" but now, when I think of it it should be
the
> > > default...
> > > > > :D
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message