flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Mon, 03 Aug 2015 07:41:39 GMT
Yes, that's what I was proposing in my second mail:

I thought about having some tighter restrictions here. My idea was to
enforce that the feedback edges must have the same parallelism as the
original input stream, otherwise shipping strategies such as "keyBy",
"shuffle", "rebalance" don't seem to make sense because they would differ
from the distribution of the original elements (at least IMHO). Maybe I'm
wrong there, though.

To me it seems intuitive that I get the feedback at the head they way I
specify it at the tail. But maybe that's also just me... :D

On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <sewen@apache.org> wrote:

> This model strikes me as pretty complicated. Imagine the extra logic and
> code path necessary for proper checkpointing as well.
>
> Why not do a simple approach:
>   - There is one parallel head, one parallel tail, both with the same
> parallelism
>
>   - Any computation in between may have it own parallelism, no special
> cases
>
>   - If the tail does not have the same parallelism as the head, it will not
> by the tail, but flow will attach an additional tail operator. Between the
> original tail and the additional tail, the streams are redistributed to
> achieve the required parallelism.
>
> Wouldn't that give us the same and make things much easier. The batch
> iterations work that way, by the way.
>
>
>
> On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > To answer the question plain and simple: No, there are several different
> > parallel heads and tails.
> >
> > For example in this:
> > val iter = ds.iteration()
> >
> > val head_tail1 = iter.map().parallelism(2)
> > val head_tail2 = iter.map().parallelism(4)
> >
> > iter.closeWith(head_tail1.union(head_tail2))
> >
> > We have one head/tail pair with parallelism 2 and on with parallelism 4.
> >
> > Of the top of my head, I don't know what happens in this case though:
> >
> > val iter = ds.iteration()
> >
> > val head1 = iter.map().parallelism(2)
> > val head2 = iter.map().parallelism(4)
> >
> > val tail1 = head1.map().parallelism(6)
> > val tail2 = head2.map().parallelism(8)
> >
> > iter.closeWith(tail1.union(tail2))
> >
> > (Which is also tricky with the parallelism of the input stream)
> >
> >
> > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <gyula.fora@gmail.com> wrote:
> >
> > > In a streaming program when we create an IterativeDataStream, we
> > > practically mark the union point of some later feedback stream (the one
> > > passed in to closeWith(..)).
> > >
> > > The operators applied on this IterativeDataStream will receive the
> > feedback
> > > input as well. We call the operators applied on the iterative
> dataStream
> > > head operators. We call the operators that produce the streams passed
> > into
> > > closeWith tail operators. With this terminology we can have many heads
> > and
> > > tails with varying parallelism.
> > >
> > > Stephan Ewen <sewen@apache.org> ezt írta (időpont: 2015. aug. 2., V,
> > > 20:16):
> > >
> > > > I don't get the discussion here, can you help me with what you mean
> by
> > > > "different iteration heads and tails" ?
> > > >
> > > > An iteration does not have one parallel head and one parallel tail?
> > > >
> > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > >
> > > > > Maybe you can reuse some of the logic that is currently there on
> the
> > > > > StreamGraph, with building StreamLoops first which will be used to
> > > > generate
> > > > > the sources and sinks right before building the JobGraph. This
> avoids
> > > the
> > > > > need of knowing everything beforehand.
> > > > >
> > > > > I actually added this to avoid the complexities that you are
> probably
> > > > > facing now.
> > > > >
> > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont:
2015.
> júl.
> > > > 31.,
> > > > > P, 17:28):
> > > > >
> > > > > > Sure it can be done, it's just more complex if you try to do
it
> in
> > a
> > > > sane
> > > > > > way without having the code that builds the StreamGraph all
over
> > the
> > > > > place.
> > > > > > :D
> > > > > >
> > > > > > I'll try to come up with something. This is my current work
in
> > > > progress,
> > > > > by
> > > > > > the way:
> https://github.com/aljoscha/flink/tree/stream-api-rework
> > > > > >
> > > > > > I managed to ban the StreamGraph from StreamExecutionEnvironment
> > and
> > > > the
> > > > > > API classes such as DataStream. The API methods construct a
Graph
> > of
> > > > > > Transformation Nodes and don't contain any information
> themselves.
> > > Then
> > > > > > there is a StreamGraphGenerator that builds a StreamGraph from
> the
> > > > > > transformations. The abstraction is very nice and simple, the
> only
> > > > > problem
> > > > > > that remains are the differing-parallelism-iterations but I'll
> > figure
> > > > > them
> > > > > > out.
> > > > > >
> > > > > > P.S. The code is not well documented yet, but the base class
for
> > > > > > transformations is StreamTransformation. From there anyone who
> > want's
> > > > to
> > > > > > check it out can find the other transformations.
> > > > > >
> > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <gyula.fora@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > There might be reasons why a user would want different
> > parallelism
> > > at
> > > > > the
> > > > > > > head operators (depending on what else that head operator
might
> > > > > process)
> > > > > > so
> > > > > > > restricting them to the same parallelism is a little bit
weird
> > > don't
> > > > > you
> > > > > > > think? It kind of goes against the whole opeartors-parallelism
> > > idea.
> > > > > > >
> > > > > > > I don't think its a huge complexity to group head operators
> > > together
> > > > by
> > > > > > > parallelism and add a source/sink per each group like we
do
> now.
> > > What
> > > > > do
> > > > > > > you say?
> > > > > > >
> > > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta
(időpont:
> 2015.
> > > júl.
> > > > > > 31.,
> > > > > > > P, 17:10):
> > > > > > >
> > > > > > > > Yes, I'm not saying that it makes sense to do it,
I'm just
> > saying
> > > > > that
> > > > > > it
> > > > > > > > does translate and run. Your observation is true.
:D
> > > > > > > >
> > > > > > > > I'm wondering whether it makes sense to allow users
to have
> > > > iteration
> > > > > > > heads
> > > > > > > > with differing parallelism, in fact.
> > > > > > > >
> > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra <
> gyula.fora@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > I still don't get how it could possibly work,
let me tell
> you
> > > > how I
> > > > > > see
> > > > > > > > and
> > > > > > > > > correct me in my logic :)
> > > > > > > > >
> > > > > > > > > You have this program:
> > > > > > > > > ids.map1().setParallelism(2)
> > > > > > > > > ids.map2().setParallelism(4)
> > > > > > > > >
> > > > > > > > > //...
> > > > > > > > >
> > > > > > > > > ids.closeWith(feedback.groupBy(0))
> > > > > > > > >
> > > > > > > > > You are suggesting that we only have one iteration
> > source/sink
> > > > pair
> > > > > > > with
> > > > > > > > > parallelism of either 2 or 4. I will assume that
the
> > > parallelism
> > > > > is 2
> > > > > > > for
> > > > > > > > > the sake of the argument.
> > > > > > > > >
> > > > > > > > > The iteration source is connected to map1 and
map2 with
> > Forward
> > > > > > > > > partitioning and the sink is connected with groupBy(0).
> > > > > > > > > Each sink instance will receive all tuples of
a given key
> > which
> > > > > also
> > > > > > > > means
> > > > > > > > > that each iteration source instance (2) will
too.
> > > > > > > > >
> > > > > > > > > Now here comes the problem: the source will forward
the
> > tuples
> > > to
> > > > > > map 1
> > > > > > > > and
> > > > > > > > > since we have forward connection we maintiain
the groupby
> > > > semantics
> > > > > > > (this
> > > > > > > > > is perfect.)  the sources will also forward to
map 2 which
> > has
> > > > > higher
> > > > > > > > > parallelism so the tuple sending turns into round
robin,
> > which
> > > > > screws
> > > > > > > up
> > > > > > > > > the groupby.
> > > > > > > > >
> > > > > > > > > What did I miss?
> > > > > > > > > Gyula
> > > > > > > > >
> > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta (időpont:
> > > 2015.
> > > > > júl.
> > > > > > > > 31.,
> > > > > > > > > P, 14:59):
> > > > > > > > >
> > > > > > > > > > Yes, this would still work. For example,
I have this
> crazy
> > > > graph:
> > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/
That results
> from
> > > > this
> > > > > > > > program:
> > > > > > > > > > https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > > > > > > >
> > > > > > > > > > It works, and the implementation is very
simple,
> actually.
> > > > > > > > > >
> > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula Fóra
<
> > > gyula.fora@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I mean that the head operators have
different
> > parallelism:
> > > > > > > > > > >
> > > > > > > > > > > IterativeDataStream ids = ...
> > > > > > > > > > >
> > > > > > > > > > > ids.map().setParallelism(2)
> > > > > > > > > > > ids.map().setParallelism(4)
> > > > > > > > > > >
> > > > > > > > > > > //...
> > > > > > > > > > >
> > > > > > > > > > > ids.closeWith(feedback)
> > > > > > > > > > >
> > > > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta
> > (időpont:
> > > > > 2015.
> > > > > > > júl.
> > > > > > > > > > 31.,
> > > > > > > > > > > P, 14:23):
> > > > > > > > > > >
> > > > > > > > > > > > I thought about having some tighter
restrictions
> here.
> > My
> > > > > idea
> > > > > > > was
> > > > > > > > to
> > > > > > > > > > > > enforce that the feedback edges
must have the same
> > > > > parallelism
> > > > > > as
> > > > > > > > the
> > > > > > > > > > > > original input stream, otherwise
shipping strategies
> > such
> > > > as
> > > > > > > > "keyBy",
> > > > > > > > > > > > "shuffle", "rebalance" don't seem
to make sense
> because
> > > > they
> > > > > > > would
> > > > > > > > > > differ
> > > > > > > > > > > > from the distribution of the original
elements (at
> > least
> > > > > IMHO).
> > > > > > > > Maybe
> > > > > > > > > > I'm
> > > > > > > > > > > > wrong there, though.
> > > > > > > > > > > >
> > > > > > > > > > > > To me it seems intuitive that
I get the feedback at
> the
> > > > head
> > > > > > they
> > > > > > > > > way I
> > > > > > > > > > > > specify it at the tail. But maybe
that's also just
> > me...
> > > :D
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:00 Gyula
Fóra <
> > > gyfora@apache.org
> > > > >
> > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am not sure what is the
intuitive behaviour here.
> > As
> > > > you
> > > > > > are
> > > > > > > > not
> > > > > > > > > > > > applying
> > > > > > > > > > > > > a transformation on the feedback
stream but pass it
> > to
> > > a
> > > > > > > > closeWith
> > > > > > > > > > > > method,
> > > > > > > > > > > > > I thought it was somehow
nature that it gets the
> > > > > partitioning
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > iteration input, but maybe
its not intuitive.
> > > > > > > > > > > > >
> > > > > > > > > > > > > If others also think that
preserving feedback
> > > > partitioning
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > > the
> > > > > > > > > > > > > default I am not against
it :)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Btw, this still won't make
it very simple. We still
> > > need
> > > > as
> > > > > > > many
> > > > > > > > > > > > > source/sink pairs as we have
different parallelism
> > > among
> > > > > the
> > > > > > > head
> > > > > > > > > > > > > operators. Otherwise the
forwarding logic wont
> work.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > Gyula
> > > > > > > > > > > > >
> > > > > > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta
> > > > (időpont:
> > > > > > > 2015.
> > > > > > > > > júl.
> > > > > > > > > > > > 31.,
> > > > > > > > > > > > > P, 11:52):
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > I'm currently working
on making the StreamGraph
> > > > > generation
> > > > > > > more
> > > > > > > > > > > > > centralized
> > > > > > > > > > > > > > (i.e. not spread across
the different API
> classes).
> > > The
> > > > > > > > question
> > > > > > > > > is
> > > > > > > > > > > now
> > > > > > > > > > > > > why
> > > > > > > > > > > > > > we need to switch to
preserve partitioning? Could
> > we
> > > > not
> > > > > > make
> > > > > > > > > > > > "preserve"
> > > > > > > > > > > > > > partitioning the default
and if users want to
> have
> > > > > shuffle
> > > > > > > > > > > partitioning
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > anything they have to
specify it manually when
> > adding
> > > > the
> > > > > > > > > feedback
> > > > > > > > > > > > edge?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > This would make for
a very simple scheme where
> the
> > > > > > iteration
> > > > > > > > > > sources
> > > > > > > > > > > > are
> > > > > > > > > > > > > > always connected to
the heads using "forward" and
> > the
> > > > > tails
> > > > > > > are
> > > > > > > > > > > > connected
> > > > > > > > > > > > > > to the iteration sinks
using whatever partitioner
> > was
> > > > set
> > > > > > by
> > > > > > > > the
> > > > > > > > > > > user.
> > > > > > > > > > > > > This
> > > > > > > > > > > > > > would make it more transparent
than the current
> > > default
> > > > > of
> > > > > > > the
> > > > > > > > > > > > "shuffle"
> > > > > > > > > > > > > > betweens tails and iteration
sinks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > Aljoscha
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > P.S. I now we had quite
some discussion about
> > > > introducing
> > > > > > > > > "preserve
> > > > > > > > > > > > > > partitioning" but now,
when I think of it it
> should
> > > be
> > > > > the
> > > > > > > > > > default...
> > > > > > > > > > > > :D
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message