flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Fri, 31 Jul 2015 15:27:47 GMT
Sure it can be done, it's just more complex if you try to do it in a sane
way without having the code that builds the StreamGraph all over the place.
:D

I'll try to come up with something. This is my current work in progress, by
the way: https://github.com/aljoscha/flink/tree/stream-api-rework

I managed to ban the StreamGraph from StreamExecutionEnvironment and the
API classes such as DataStream. The API methods construct a Graph of
Transformation Nodes and don't contain any information themselves. Then
there is a StreamGraphGenerator that builds a StreamGraph from the
transformations. The abstraction is very nice and simple, the only problem
that remains are the differing-parallelism-iterations but I'll figure them
out.

P.S. The code is not well documented yet, but the base class for
transformations is StreamTransformation. From there anyone who want's to
check it out can find the other transformations.

On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.fora@gmail.com> wrote:

> There might be reasons why a user would want different parallelism at the
> head operators (depending on what else that head operator might process) so
> restricting them to the same parallelism is a little bit weird don't you
> think? It kind of goes against the whole opeartors-parallelism idea.
>
> I don't think its a huge complexity to group head operators together by
> parallelism and add a source/sink per each group like we do now. What do
> you say?
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl. 31.,
> P, 17:10):
>
> > Yes, I'm not saying that it makes sense to do it, I'm just saying that it
> > does translate and run. Your observation is true. :D
> >
> > I'm wondering whether it makes sense to allow users to have iteration
> heads
> > with differing parallelism, in fact.
> >
> > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <gyula.fora@gmail.com> wrote:
> >
> > > I still don't get how it could possibly work, let me tell you how I see
> > and
> > > correct me in my logic :)
> > >
> > > You have this program:
> > > ids.map1().setParallelism(2)
> > > ids.map2().setParallelism(4)
> > >
> > > //...
> > >
> > > ids.closeWith(feedback.groupBy(0))
> > >
> > > You are suggesting that we only have one iteration source/sink pair
> with
> > > parallelism of either 2 or 4. I will assume that the parallelism is 2
> for
> > > the sake of the argument.
> > >
> > > The iteration source is connected to map1 and map2 with Forward
> > > partitioning and the sink is connected with groupBy(0).
> > > Each sink instance will receive all tuples of a given key which also
> > means
> > > that each iteration source instance (2) will too.
> > >
> > > Now here comes the problem: the source will forward the tuples to map 1
> > and
> > > since we have forward connection we maintiain the groupby semantics
> (this
> > > is perfect.)  the sources will also forward to map 2 which has higher
> > > parallelism so the tuple sending turns into round robin, which screws
> up
> > > the groupby.
> > >
> > > What did I miss?
> > > Gyula
> > >
> > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. júl.
> > 31.,
> > > P, 14:59):
> > >
> > > > Yes, this would still work. For example, I have this crazy graph:
> > > > http://postimg.org/image/xtv8ay8hv/full/ That results from this
> > program:
> > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > >
> > > > It works, and the implementation is very simple, actually.
> > > >
> > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra <gyula.fora@gmail.com>
> wrote:
> > > >
> > > > > I mean that the head operators have different parallelism:
> > > > >
> > > > > IterativeDataStream ids = ...
> > > > >
> > > > > ids.map().setParallelism(2)
> > > > > ids.map().setParallelism(4)
> > > > >
> > > > > //...
> > > > >
> > > > > ids.closeWith(feedback)
> > > > >
> > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
2015.
> júl.
> > > > 31.,
> > > > > P, 14:23):
> > > > >
> > > > > > I thought about having some tighter restrictions here. My idea
> was
> > to
> > > > > > enforce that the feedback edges must have the same parallelism
as
> > the
> > > > > > original input stream, otherwise shipping strategies such as
> > "keyBy",
> > > > > > "shuffle", "rebalance" don't seem to make sense because they
> would
> > > > differ
> > > > > > from the distribution of the original elements (at least IMHO).
> > Maybe
> > > > I'm
> > > > > > wrong there, though.
> > > > > >
> > > > > > To me it seems intuitive that I get the feedback at the head
they
> > > way I
> > > > > > specify it at the tail. But maybe that's also just me... :D
> > > > > >
> > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula Fóra <gyfora@apache.org>
> wrote:
> > > > > >
> > > > > > > Hey,
> > > > > > >
> > > > > > > I am not sure what is the intuitive behaviour here. As
you are
> > not
> > > > > > applying
> > > > > > > a transformation on the feedback stream but pass it to
a
> > closeWith
> > > > > > method,
> > > > > > > I thought it was somehow nature that it gets the partitioning
> of
> > > the
> > > > > > > iteration input, but maybe its not intuitive.
> > > > > > >
> > > > > > > If others also think that preserving feedback partitioning
> should
> > > be
> > > > > the
> > > > > > > default I am not against it :)
> > > > > > >
> > > > > > > Btw, this still won't make it very simple. We still need
as
> many
> > > > > > > source/sink pairs as we have different parallelism among
the
> head
> > > > > > > operators. Otherwise the forwarding logic wont work.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Gyula
> > > > > > >
> > > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta
(időpont:
> 2015.
> > > júl.
> > > > > > 31.,
> > > > > > > P, 11:52):
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > I'm currently working on making the StreamGraph generation
> more
> > > > > > > centralized
> > > > > > > > (i.e. not spread across the different API classes).
The
> > question
> > > is
> > > > > now
> > > > > > > why
> > > > > > > > we need to switch to preserve partitioning? Could
we not make
> > > > > > "preserve"
> > > > > > > > partitioning the default and if users want to have
shuffle
> > > > > partitioning
> > > > > > > or
> > > > > > > > anything they have to specify it manually when adding
the
> > > feedback
> > > > > > edge?
> > > > > > > >
> > > > > > > > This would make for a very simple scheme where the
iteration
> > > > sources
> > > > > > are
> > > > > > > > always connected to the heads using "forward" and
the tails
> are
> > > > > > connected
> > > > > > > > to the iteration sinks using whatever partitioner
was set by
> > the
> > > > > user.
> > > > > > > This
> > > > > > > > would make it more transparent than the current default
of
> the
> > > > > > "shuffle"
> > > > > > > > betweens tails and iteration sinks.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > P.S. I now we had quite some discussion about introducing
> > > "preserve
> > > > > > > > partitioning" but now, when I think of it it should
be the
> > > > default...
> > > > > > :D
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message