flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Question About "Preserve Partitioning" in Stream Iteration
Date Mon, 03 Aug 2015 08:24:00 GMT
I don't think there is a fundamental limitation to the simpler approach.
The only real difference is that DOPs are adjusted before the tail, so only
one head/tail pair is needed.

Nested iterations should still be possible...

On Mon, Aug 3, 2015 at 10:21 AM, Gyula Fóra <gyula.fora@gmail.com> wrote:

> It is critical for many applications (such as SAMOA or Storm compatibility)
> to build arbitrary cyclic flows. If your suggestion covers all cases (for
> instance nested iterations) then I am not against it.
>
> The current implementation is just one way to do it, but it allows
> arbitrary cycles. From the checkpointing perspective, I don't think this
> will make too much of a difference as that will probably have to be handled
> on the receiver side anyways if you think about the cyclic algorithm.
>
> Gyula
>
> Aljoscha Krettek <aljoscha@apache.org> ezt írta (időpont: 2015. aug. 3.,
> H,
> 9:41):
>
> > Yes, that's what I was proposing in my second mail:
> >
> > I thought about having some tighter restrictions here. My idea was to
> > enforce that the feedback edges must have the same parallelism as the
> > original input stream, otherwise shipping strategies such as "keyBy",
> > "shuffle", "rebalance" don't seem to make sense because they would differ
> > from the distribution of the original elements (at least IMHO). Maybe I'm
> > wrong there, though.
> >
> > To me it seems intuitive that I get the feedback at the head they way I
> > specify it at the tail. But maybe that's also just me... :D
> >
> > On Mon, 3 Aug 2015 at 00:14 Stephan Ewen <sewen@apache.org> wrote:
> >
> > > This model strikes me as pretty complicated. Imagine the extra logic
> and
> > > code path necessary for proper checkpointing as well.
> > >
> > > Why not do a simple approach:
> > >   - There is one parallel head, one parallel tail, both with the same
> > > parallelism
> > >
> > >   - Any computation in between may have it own parallelism, no special
> > > cases
> > >
> > >   - If the tail does not have the same parallelism as the head, it will
> > not
> > > by the tail, but flow will attach an additional tail operator. Between
> > the
> > > original tail and the additional tail, the streams are redistributed to
> > > achieve the required parallelism.
> > >
> > > Wouldn't that give us the same and make things much easier. The batch
> > > iterations work that way, by the way.
> > >
> > >
> > >
> > > On Sun, Aug 2, 2015 at 10:03 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > To answer the question plain and simple: No, there are several
> > different
> > > > parallel heads and tails.
> > > >
> > > > For example in this:
> > > > val iter = ds.iteration()
> > > >
> > > > val head_tail1 = iter.map().parallelism(2)
> > > > val head_tail2 = iter.map().parallelism(4)
> > > >
> > > > iter.closeWith(head_tail1.union(head_tail2))
> > > >
> > > > We have one head/tail pair with parallelism 2 and on with parallelism
> > 4.
> > > >
> > > > Of the top of my head, I don't know what happens in this case though:
> > > >
> > > > val iter = ds.iteration()
> > > >
> > > > val head1 = iter.map().parallelism(2)
> > > > val head2 = iter.map().parallelism(4)
> > > >
> > > > val tail1 = head1.map().parallelism(6)
> > > > val tail2 = head2.map().parallelism(8)
> > > >
> > > > iter.closeWith(tail1.union(tail2))
> > > >
> > > > (Which is also tricky with the parallelism of the input stream)
> > > >
> > > >
> > > > On Sun, 2 Aug 2015 at 21:22 Gyula Fóra <gyula.fora@gmail.com> wrote:
> > > >
> > > > > In a streaming program when we create an IterativeDataStream, we
> > > > > practically mark the union point of some later feedback stream (the
> > one
> > > > > passed in to closeWith(..)).
> > > > >
> > > > > The operators applied on this IterativeDataStream will receive the
> > > > feedback
> > > > > input as well. We call the operators applied on the iterative
> > > dataStream
> > > > > head operators. We call the operators that produce the streams
> passed
> > > > into
> > > > > closeWith tail operators. With this terminology we can have many
> > heads
> > > > and
> > > > > tails with varying parallelism.
> > > > >
> > > > > Stephan Ewen <sewen@apache.org> ezt írta (időpont: 2015.
aug. 2.,
> V,
> > > > > 20:16):
> > > > >
> > > > > > I don't get the discussion here, can you help me with what you
> mean
> > > by
> > > > > > "different iteration heads and tails" ?
> > > > > >
> > > > > > An iteration does not have one parallel head and one parallel
> tail?
> > > > > >
> > > > > > On Fri, Jul 31, 2015 at 6:52 PM, Gyula Fóra <
> gyula.fora@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Maybe you can reuse some of the logic that is currently
there
> on
> > > the
> > > > > > > StreamGraph, with building StreamLoops first which will
be used
> > to
> > > > > > generate
> > > > > > > the sources and sinks right before building the JobGraph.
This
> > > avoids
> > > > > the
> > > > > > > need of knowing everything beforehand.
> > > > > > >
> > > > > > > I actually added this to avoid the complexities that you
are
> > > probably
> > > > > > > facing now.
> > > > > > >
> > > > > > > Aljoscha Krettek <aljoscha@apache.org> ezt írta
(időpont:
> 2015.
> > > júl.
> > > > > > 31.,
> > > > > > > P, 17:28):
> > > > > > >
> > > > > > > > Sure it can be done, it's just more complex if you
try to do
> it
> > > in
> > > > a
> > > > > > sane
> > > > > > > > way without having the code that builds the StreamGraph
all
> > over
> > > > the
> > > > > > > place.
> > > > > > > > :D
> > > > > > > >
> > > > > > > > I'll try to come up with something. This is my current
work
> in
> > > > > > progress,
> > > > > > > by
> > > > > > > > the way:
> > > https://github.com/aljoscha/flink/tree/stream-api-rework
> > > > > > > >
> > > > > > > > I managed to ban the StreamGraph from
> > StreamExecutionEnvironment
> > > > and
> > > > > > the
> > > > > > > > API classes such as DataStream. The API methods construct
a
> > Graph
> > > > of
> > > > > > > > Transformation Nodes and don't contain any information
> > > themselves.
> > > > > Then
> > > > > > > > there is a StreamGraphGenerator that builds a StreamGraph
> from
> > > the
> > > > > > > > transformations. The abstraction is very nice and
simple, the
> > > only
> > > > > > > problem
> > > > > > > > that remains are the differing-parallelism-iterations
but
> I'll
> > > > figure
> > > > > > > them
> > > > > > > > out.
> > > > > > > >
> > > > > > > > P.S. The code is not well documented yet, but the
base class
> > for
> > > > > > > > transformations is StreamTransformation. From there
anyone
> who
> > > > want's
> > > > > > to
> > > > > > > > check it out can find the other transformations.
> > > > > > > >
> > > > > > > > On Fri, 31 Jul 2015 at 17:17 Gyula Fóra <
> gyula.fora@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > There might be reasons why a user would want
different
> > > > parallelism
> > > > > at
> > > > > > > the
> > > > > > > > > head operators (depending on what else that head
operator
> > might
> > > > > > > process)
> > > > > > > > so
> > > > > > > > > restricting them to the same parallelism is a
little bit
> > weird
> > > > > don't
> > > > > > > you
> > > > > > > > > think? It kind of goes against the whole
> > opeartors-parallelism
> > > > > idea.
> > > > > > > > >
> > > > > > > > > I don't think its a huge complexity to group
head operators
> > > > > together
> > > > > > by
> > > > > > > > > parallelism and add a source/sink per each group
like we do
> > > now.
> > > > > What
> > > > > > > do
> > > > > > > > > you say?
> > > > > > > > >
> > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta (időpont:
> > > 2015.
> > > > > júl.
> > > > > > > > 31.,
> > > > > > > > > P, 17:10):
> > > > > > > > >
> > > > > > > > > > Yes, I'm not saying that it makes sense
to do it, I'm
> just
> > > > saying
> > > > > > > that
> > > > > > > > it
> > > > > > > > > > does translate and run. Your observation
is true. :D
> > > > > > > > > >
> > > > > > > > > > I'm wondering whether it makes sense to
allow users to
> have
> > > > > > iteration
> > > > > > > > > heads
> > > > > > > > > > with differing parallelism, in fact.
> > > > > > > > > >
> > > > > > > > > > On Fri, 31 Jul 2015 at 16:40 Gyula Fóra
<
> > > gyula.fora@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > I still don't get how it could possibly
work, let me
> tell
> > > you
> > > > > > how I
> > > > > > > > see
> > > > > > > > > > and
> > > > > > > > > > > correct me in my logic :)
> > > > > > > > > > >
> > > > > > > > > > > You have this program:
> > > > > > > > > > > ids.map1().setParallelism(2)
> > > > > > > > > > > ids.map2().setParallelism(4)
> > > > > > > > > > >
> > > > > > > > > > > //...
> > > > > > > > > > >
> > > > > > > > > > > ids.closeWith(feedback.groupBy(0))
> > > > > > > > > > >
> > > > > > > > > > > You are suggesting that we only have
one iteration
> > > > source/sink
> > > > > > pair
> > > > > > > > > with
> > > > > > > > > > > parallelism of either 2 or 4. I will
assume that the
> > > > > parallelism
> > > > > > > is 2
> > > > > > > > > for
> > > > > > > > > > > the sake of the argument.
> > > > > > > > > > >
> > > > > > > > > > > The iteration source is connected to
map1 and map2 with
> > > > Forward
> > > > > > > > > > > partitioning and the sink is connected
with groupBy(0).
> > > > > > > > > > > Each sink instance will receive all
tuples of a given
> key
> > > > which
> > > > > > > also
> > > > > > > > > > means
> > > > > > > > > > > that each iteration source instance
(2) will too.
> > > > > > > > > > >
> > > > > > > > > > > Now here comes the problem: the source
will forward the
> > > > tuples
> > > > > to
> > > > > > > > map 1
> > > > > > > > > > and
> > > > > > > > > > > since we have forward connection we
maintiain the
> groupby
> > > > > > semantics
> > > > > > > > > (this
> > > > > > > > > > > is perfect.)  the sources will also
forward to map 2
> > which
> > > > has
> > > > > > > higher
> > > > > > > > > > > parallelism so the tuple sending turns
into round
> robin,
> > > > which
> > > > > > > screws
> > > > > > > > > up
> > > > > > > > > > > the groupby.
> > > > > > > > > > >
> > > > > > > > > > > What did I miss?
> > > > > > > > > > > Gyula
> > > > > > > > > > >
> > > > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta
> > (időpont:
> > > > > 2015.
> > > > > > > júl.
> > > > > > > > > > 31.,
> > > > > > > > > > > P, 14:59):
> > > > > > > > > > >
> > > > > > > > > > > > Yes, this would still work. For
example, I have this
> > > crazy
> > > > > > graph:
> > > > > > > > > > > > http://postimg.org/image/xtv8ay8hv/full/
That
> results
> > > from
> > > > > > this
> > > > > > > > > > program:
> > > > > > > > > > > >
> https://gist.github.com/aljoscha/45aaf62b2a7957cfafd5
> > > > > > > > > > > >
> > > > > > > > > > > > It works, and the implementation
is very simple,
> > > actually.
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, 31 Jul 2015 at 14:30 Gyula
Fóra <
> > > > > gyula.fora@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > I mean that the head operators
have different
> > > > parallelism:
> > > > > > > > > > > > >
> > > > > > > > > > > > > IterativeDataStream ids =
...
> > > > > > > > > > > > >
> > > > > > > > > > > > > ids.map().setParallelism(2)
> > > > > > > > > > > > > ids.map().setParallelism(4)
> > > > > > > > > > > > >
> > > > > > > > > > > > > //...
> > > > > > > > > > > > >
> > > > > > > > > > > > > ids.closeWith(feedback)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Aljoscha Krettek <aljoscha@apache.org>
ezt írta
> > > > (időpont:
> > > > > > > 2015.
> > > > > > > > > júl.
> > > > > > > > > > > > 31.,
> > > > > > > > > > > > > P, 14:23):
> > > > > > > > > > > > >
> > > > > > > > > > > > > > I thought about having
some tighter restrictions
> > > here.
> > > > My
> > > > > > > idea
> > > > > > > > > was
> > > > > > > > > > to
> > > > > > > > > > > > > > enforce that the feedback
edges must have the
> same
> > > > > > > parallelism
> > > > > > > > as
> > > > > > > > > > the
> > > > > > > > > > > > > > original input stream,
otherwise shipping
> > strategies
> > > > such
> > > > > > as
> > > > > > > > > > "keyBy",
> > > > > > > > > > > > > > "shuffle", "rebalance"
don't seem to make sense
> > > because
> > > > > > they
> > > > > > > > > would
> > > > > > > > > > > > differ
> > > > > > > > > > > > > > from the distribution
of the original elements
> (at
> > > > least
> > > > > > > IMHO).
> > > > > > > > > > Maybe
> > > > > > > > > > > > I'm
> > > > > > > > > > > > > > wrong there, though.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > To me it seems intuitive
that I get the feedback
> at
> > > the
> > > > > > head
> > > > > > > > they
> > > > > > > > > > > way I
> > > > > > > > > > > > > > specify it at the tail.
But maybe that's also
> just
> > > > me...
> > > > > :D
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, 31 Jul 2015
at 14:00 Gyula Fóra <
> > > > > gyfora@apache.org
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hey,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I am not sure what
is the intuitive behaviour
> > here.
> > > > As
> > > > > > you
> > > > > > > > are
> > > > > > > > > > not
> > > > > > > > > > > > > > applying
> > > > > > > > > > > > > > > a transformation
on the feedback stream but
> pass
> > it
> > > > to
> > > > > a
> > > > > > > > > > closeWith
> > > > > > > > > > > > > > method,
> > > > > > > > > > > > > > > I thought it was
somehow nature that it gets
> the
> > > > > > > partitioning
> > > > > > > > > of
> > > > > > > > > > > the
> > > > > > > > > > > > > > > iteration input,
but maybe its not intuitive.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > If others also
think that preserving feedback
> > > > > > partitioning
> > > > > > > > > should
> > > > > > > > > > > be
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > default I am not
against it :)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Btw, this still
won't make it very simple. We
> > still
> > > > > need
> > > > > > as
> > > > > > > > > many
> > > > > > > > > > > > > > > source/sink pairs
as we have different
> > parallelism
> > > > > among
> > > > > > > the
> > > > > > > > > head
> > > > > > > > > > > > > > > operators. Otherwise
the forwarding logic wont
> > > work.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > Gyula
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Aljoscha Krettek
<aljoscha@apache.org> ezt
> írta
> > > > > > (időpont:
> > > > > > > > > 2015.
> > > > > > > > > > > júl.
> > > > > > > > > > > > > > 31.,
> > > > > > > > > > > > > > > P, 11:52):
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > I'm currently
working on making the
> StreamGraph
> > > > > > > generation
> > > > > > > > > more
> > > > > > > > > > > > > > > centralized
> > > > > > > > > > > > > > > > (i.e. not
spread across the different API
> > > classes).
> > > > > The
> > > > > > > > > > question
> > > > > > > > > > > is
> > > > > > > > > > > > > now
> > > > > > > > > > > > > > > why
> > > > > > > > > > > > > > > > we need to
switch to preserve partitioning?
> > Could
> > > > we
> > > > > > not
> > > > > > > > make
> > > > > > > > > > > > > > "preserve"
> > > > > > > > > > > > > > > > partitioning
the default and if users want to
> > > have
> > > > > > > shuffle
> > > > > > > > > > > > > partitioning
> > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > anything they
have to specify it manually
> when
> > > > adding
> > > > > > the
> > > > > > > > > > > feedback
> > > > > > > > > > > > > > edge?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This would
make for a very simple scheme
> where
> > > the
> > > > > > > > iteration
> > > > > > > > > > > > sources
> > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > always connected
to the heads using "forward"
> > and
> > > > the
> > > > > > > tails
> > > > > > > > > are
> > > > > > > > > > > > > > connected
> > > > > > > > > > > > > > > > to the iteration
sinks using whatever
> > partitioner
> > > > was
> > > > > > set
> > > > > > > > by
> > > > > > > > > > the
> > > > > > > > > > > > > user.
> > > > > > > > > > > > > > > This
> > > > > > > > > > > > > > > > would make
it more transparent than the
> current
> > > > > default
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > > "shuffle"
> > > > > > > > > > > > > > > > betweens tails
and iteration sinks.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > Aljoscha
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > P.S. I now
we had quite some discussion about
> > > > > > introducing
> > > > > > > > > > > "preserve
> > > > > > > > > > > > > > > > partitioning"
but now, when I think of it it
> > > should
> > > > > be
> > > > > > > the
> > > > > > > > > > > > default...
> > > > > > > > > > > > > > :D
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message