flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Sun, 02 Aug 2015 18:14:52 GMT
I don't get the discussion here, can you help me with what you mean by
"different iteration heads and tails" ?

An iteration does not have one parallel head and one parallel tail?

On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <gyula.fora@gmail.com> wrote:

> Maybe you can reuse some of the logic that is currently there on the
> StreamGraph, with building StreamLoops first which will be used to generate
> the sources and sinks right before building the JobGraph. This avoids the
> need of knowing everything beforehand.
>
> I actually added this to avoid the complexities that you are probably
> facing now.
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 31.,
> P, 17:28):
>
> > Sure it can be done, it's just more complex if you try to do it in a sane
> > way without having the code that builds the StreamGraph all over the
> place.
> > :D
> >
> > I'll try to come up with something. This is my current work in progress,
> by
> > the way: https://github.com/aljoscha/flink/tree/stream-api-rework
> >
> > I managed to ban the StreamGraph from StreamExecutionEnvironment and the
> > API classes such as DataStream. The API methods construct a Graph of
> > Transformation Nodes and don't contain any information themselves. Then
> > there is a StreamGraphGenerator that builds a StreamGraph from the
> > transformations. The abstraction is very nice and simple, the only
> problem
> > that remains are the differing-parallelism-iterations but I'll figure
> them
> > out.
> >
> > P.S. The code is not well documented yet, but the base class for
> > transformations is StreamTransformation. From there anyone who want's to
> > check it out can find the other transformations.
> >
> > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.fora@gmail.com> wrote:
> >
> > > There might be reasons why a user would want different parallelism at
> the
> > > head operators (depending on what else that head operator might
> process)
> > so
> > > restricting them to the same parallelism is a little bit weird don't
> you
> > > think? It kind of goes against the whole opeartors-parallelism idea.
> > >
> > > I don't think its a huge complexity to group head operators together by
> > > parallelism and add a source/sink per each group like we do now. What
> do
> > > you say?
> > >
> > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl.
> > 31.,
> > > P, 17:10):
> > >
> > > > Yes, I'm not saying that it makes sense to do it, I'm just saying
> that
> > it
> > > > does translate and run. Your observation is true. :D
> > > >
> > > > I'm wondering whether it makes sense to allow users to have iteration
> > > heads
> > > > with differing parallelism, in fact.
> > > >
> > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> > > >
> > > > > I still don't get how it could possibly work, let me tell you how
I
> > see
> > > > and
> > > > > correct me in my logic :)
> > > > >
> > > > > You have this program:
> > > > > ids.map1().setParallelism(2)
> > > > > ids.map2().setParallelism(4)
> > > > >
> > > > > //...
> > > > >
> > > > > ids.closeWith(feedback.groupBy(0))
> > > > >
> > > > > You are suggesting that we only have one iteration source/sink pair
> > > with
> > > > > parallelism of either 2 or 4. I will assume that the parallelism
> is 2
> > > for
> > > > > the sake of the argument.
> > > > >
> > > > > The iteration source is connected to map1 and map2 with Forward
> > > > > partitioning and the sink is connected with groupBy(0).
> > > > > Each sink instance will receive all tuples of a given key which
> also
> > > > means
> > > > > that each iteration source instance (2) will too.
> > > > >
> > > > > Now here comes the problem: the source will forward the tuples to
> > map 1
> > > > and
> > > > > since we have forward connection we maintiain the groupby semantics
> > > (this
> > > > > is perfect.)  the sources will also forward to map 2 which has
> higher
> > > > > parallelism so the tuple sending turns into round robin, which
> screws
> > > up
> > > > > the groupby.
> > > > >
> > > > > What did I miss?
> > > > > Gyula
> > > > >
> > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
2015.
> júl.
> > > > 31.,
> > > > > P, 14:59):
> > > > >
> > > > > > Yes, this would still work. For example, I have this crazy graph:
> > > > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> > > > program:
> > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > > >
> > > > > > It works, and the implementation is very simple, actually.
> > > > > >
> > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > I mean that the head operators have different parallelism:
> > > > > > >
> > > > > > > IterativeDataStream ids = ...
> > > > > > >
> > > > > > > ids.map().setParallelism(2)
> > > > > > > ids.map().setParallelism(4)
> > > > > > >
> > > > > > > //...
> > > > > > >
> > > > > > > ids.closeWith(feedback)
> > > > > > >
> > > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta
(időpont:
> 2015.
> > > júl.
> > > > > > 31.,
> > > > > > > P, 14:23):
> > > > > > >
> > > > > > > > I thought about having some tighter restrictions here.
My
> idea
> > > was
> > > > to
> > > > > > > > enforce that the feedback edges must have the same
> parallelism
> > as
> > > > the
> > > > > > > > original input stream, otherwise shipping strategies
such as
> > > > "keyBy",
> > > > > > > > "shuffle", "rebalance" don't seem to make sense because
they
> > > would
> > > > > > differ
> > > > > > > > from the distribution of the original elements (at
least
> IMHO).
> > > > Maybe
> > > > > > I'm
> > > > > > > > wrong there, though.
> > > > > > > >
> > > > > > > > To me it seems intuitive that I get the feedback at
the head
> > they
> > > > > way I
> > > > > > > > specify it at the tail. But maybe that's also just
me... :D
> > > > > > > >
> > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyfora@apache.org>
> > > wrote:
> > > > > > > >
> > > > > > > > > Hey,
> > > > > > > > >
> > > > > > > > > I am not sure what is the intuitive behaviour
here. As you
> > are
> > > > not
> > > > > > > > applying
> > > > > > > > > a transformation on the feedback stream but pass
it to a
> > > > closeWith
> > > > > > > > method,
> > > > > > > > > I thought it was somehow nature that it gets
the
> partitioning
> > > of
> > > > > the
> > > > > > > > > iteration input, but maybe its not intuitive.
> > > > > > > > >
> > > > > > > > > If others also think that preserving feedback
partitioning
> > > should
> > > > > be
> > > > > > > the
> > > > > > > > > default I am not against it :)
> > > > > > > > >
> > > > > > > > > Btw, this still won't make it very simple. We
still need as
> > > many
> > > > > > > > > source/sink pairs as we have different parallelism
among
> the
> > > head
> > > > > > > > > operators. Otherwise the forwarding logic wont
work.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta (időpont:
> > > 2015.
> > > > > júl.
> > > > > > > > 31.,
> > > > > > > > > P, 11:52):
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > I'm currently working on making the StreamGraph
> generation
> > > more
> > > > > > > > > centralized
> > > > > > > > > > (i.e. not spread across the different API
classes). The
> > > > question
> > > > > is
> > > > > > > now
> > > > > > > > > why
> > > > > > > > > > we need to switch to preserve partitioning?
Could we not
> > make
> > > > > > > > "preserve"
> > > > > > > > > > partitioning the default and if users want
to have
> shuffle
> > > > > > > partitioning
> > > > > > > > > or
> > > > > > > > > > anything they have to specify it manually
when adding the
> > > > > feedback
> > > > > > > > edge?
> > > > > > > > > >
> > > > > > > > > > This would make for a very simple scheme
where the
> > iteration
> > > > > > sources
> > > > > > > > are
> > > > > > > > > > always connected to the heads using "forward"
and the
> tails
> > > are
> > > > > > > > connected
> > > > > > > > > > to the iteration sinks using whatever partitioner
was set
> > by
> > > > the
> > > > > > > user.
> > > > > > > > > This
> > > > > > > > > > would make it more transparent than the
current default
> of
> > > the
> > > > > > > > "shuffle"
> > > > > > > > > > betweens tails and iteration sinks.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > P.S. I now we had quite some discussion
about introducing
> > > > > "preserve
> > > > > > > > > > partitioning" but now, when I think of it
it should be
> the
> > > > > > default...
> > > > > > > > :D
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message