flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Neutatz <neut...@googlemail.com>
Subject Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts
Date Fri, 02 Dec 2016 13:55:01 GMT
Hi everybody,

I implemented the second approach (see https://cwiki.apache.org/confl
uence/display/FLINK/FLIP-5%3A+Only+send+data+to+each+taskman
ager+once+for+broadcasts). So each subpartition will be read by m tasks (m
= number of task managers) and the other tasks will notify the subpartition
that they don't need to read. This solves the problem, and we release the
subpartition just when we don't need it anymore.

The message I sent for all task which don't need to read is "
notifySubpartitionConsumed()"
https://github.com/FelixNeutatz/incubator-flink/blob/1b58d9c9df89620f2557b59e7fde40ffe04f49d8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L460

This means I first have to connect via a PartitionRequest and then i will
notify all channels.
One problem of using the standard PartitionRequest is that we will already
fill the first buffer:
https://github.com/FelixNeutatz/incubator-flink/blob/oneSubpartition/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java#L123

So my question is: is that ok, or:
1) should we introduce another Netty.Message
PartitionRequestAndNotifyConsumed
2) should we extend the PartitionRequest with the attribute "boolean
getAhead"

Current problems:
Native iterations:
Native iterations work but are not optimized. Theoretically, in the case of
native iterations we can also notify the subpartitions instead of reading
them, but at the moment I get the following exception when I do so:
java.lang.IllegalStateException: Queried for a buffer before requesting the
subpartition.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.consumer.Local
InputChannel.getNextBuffer(LocalInputChannel.java:152)
at org.apache.flink.runtime.io.network.partition.consumer.Singl
eInputGate.getNextBufferOrEvent(SingleInputGate.java:424)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
dReader.getNextRecord(AbstractRecordReader.java:87)
at org.apache.flink.runtime.io.network.api.reader.MutableRecord
Reader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(
ReaderIterator.java:73)
at org.apache.flink.runtime.broadcast.BroadcastVariableMaterial
ization.materializeVariable(BroadcastVariableMaterialization.java:114)

In general, it seems to me that this reduces network traffic even for
pipelined partitions in the "old" architecture. I will further investigate
why this is not working. For a simple map job with pipelined partitions
this already works. So there has to be some kind of iteration specific
thing which keeps that from working.

But for now I would propose to first push the improvements to Flink without
the iteration improvements. The overhead of the two code paths are just 2
lines of codes, which is really little.

I am happy to hear your thoughts :)

Best regards,
Felix

2016-11-10 12:24 GMT+01:00 Felix Neutatz <neutatz@googlemail.com>:

> Hi everybody,
>
> the previous approach turned out to have an issue. Since we only write to
> one subpartition, we have N-1 empty subpartitions per Task (where N =
> degree of parallelism). In the current approach I didn't consume these
> empty subpartitions. When you don't consume a subpartition it won't be
> released. So we have a memory leak.
>
> One workaround would be to read the empty subpartitions. But this is a
> really ugly work-around.
>
> So I had a chat with Till and we decided to create only one subpartition
> instead of N subpartitions per task. I have already implemented this
> approach.
>
> Now the problem is that we need to know, when to release this
> subpartition. We will create M subpartition-views per subpartition (where M
> is the number of task managers & M <= N).
>
> There are many ways to solve this problem:
> 1. Tell the subpartition how many taskmanagers will consume it.
> (=> propagate M)
> 2. All tasks which don't need to read the subpartition, send a message to
> the subpartition. So the subpartition will receive M release requests and
> N-M "I am done" requests. So when the subpartition knows the number of
> parallelism N, we are fine. (=> propagate N)
>
> Any thoughts how to tackle this problem?
>
> Best regards,
> Felix
>
> 2016-08-10 19:14 GMT+02:00 Till Rohrmann <trohrmann@apache.org>:
>
>> Cool first version Felix :-)
>>
>> On Wed, Aug 10, 2016 at 6:48 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>> > Cool, nice results!
>> >
>> > For the iteration unspecialization - we probably should design this
>> hand in
>> > hand with the streaming fault tolerance, as they share the notion of
>> > "intermediate result versions".
>> >
>> >
>> > On Wed, Aug 10, 2016 at 6:09 PM, Felix Neutatz <neutatz@googlemail.com>
>> > wrote:
>> >
>> > > Hi everybody,
>> > >
>> > > I found a quick and dirty way to make the blocking subpartition
>> readable
>> > by
>> > > multiple readers. In the JobGraph generation I make all broadcast
>> > > partitions blocking (see more details here:
>> > > https://github.com/FelixNeutatz/incubator-flink/
>> > > commits/blockingMultipleReads).
>> > > I want to point out that this branch is only experimental!
>> > >
>> > > This works for the simple Map().withBroadcastSet() use case.
>> > >
>> > > To test this approach, I run our peel bundle flink-broadcast (
>> > > https://github.com/TU-Berlin-DIMA/flink-broadcast) on the IBM power
>> > > cluster. Ibm-power has 8 nodes and we scale the number of slots per
>> node
>> > > from 1 - 16:
>> > >
>> > > broadcast.ibm-power-1 broadcast.01 6597.3333333333
>> > > broadcast.ibm-power-1 broadcast.02 5997
>> > > broadcast.ibm-power-1 broadcast.04 6576.6666666667
>> > > broadcast.ibm-power-1 broadcast.08 7024.3333333333
>> > > broadcast.ibm-power-1 broadcast.16 6933.3333333333
>> > >
>> > > The last row is the averaged run time in milliseconds over 3 runs. You
>> > can
>> > > clearly see, that the run time stays constant :)
>> > >
>> > > As discussed, this approach doesn't work yet for native iterations
>> (see
>> > > FLINK-1713).
>> > >
>> > > So in the next weeks I will work on the native iterations as Stephan
>> > > proposed.
>> > >
>> > > Best regards,
>> > > Felix
>> > >
>> > >
>> > >
>> > > 2016-08-09 21:29 GMT+07:00 Stephan Ewen <sewen@apache.org>:
>> > >
>> > > > I agree with Till. Changing the basic data exchange mechanism would
>> > screw
>> > > > up many other ongoing efforts, like more incremental recovery.
>> > > >
>> > > > It seems to make this properly applicable, we need to first
>> > un-specialize
>> > > > the iterations.
>> > > >
>> > > > (1) Allow for "versioned" intermediate results, i.e.,
>> > > result-x-superstep1,
>> > > > result-x-superstep2, result-x-superstep3, result-x-superstep4, ...
>> > > > We need something similar for fined grained recovery in streaming
>> > > > (result-x-checkpoint1, result-x-checkpoint2, result-x-checkpoint3,
>> > > > result-x-checkpoint4, ...) so it may be worth addressing that soon
>> > > anyways.
>> > > >
>> > > > (2) Make iterations not dependent on the special local back channel.
>> > > > Then we can simply schedule iterations like all other things.
>> > > >
>> > > > (3) Do the actual FLIP-5 proposal
>> > > >
>> > > >
>> > > > That's quite an effort, but I fear all else will break the engine
>> and
>> > > other
>> > > > efforts.
>> > > >
>> > > > Best,
>> > > > Stephan
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Aug 9, 2016 at 4:05 PM, Till Rohrmann <trohrmann@apache.org
>> >
>> > > > wrote:
>> > > >
>> > > > > Hi Felix,
>> > > > >
>> > > > > if we cannot work around the problem with blocking intermediate
>> > results
>> > > > in
>> > > > > iterations, then we have to make FLINK-1713 a blocker for this
new
>> > > issue.
>> > > > > But maybe you can also keep the current broadcasting mechanism
to
>> be
>> > > used
>> > > > > within iterations only. Then we can address the iteration problem
>> > > later.
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Tue, Aug 9, 2016 at 3:54 PM, Felix Neutatz <
>> > neutatz@googlemail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi Till,
>> > > > > >
>> > > > > > thanks for the fast answer. I also think this should be
the way
>> to
>> > > go.
>> > > > So
>> > > > > > should I open a new jira "Make blocking SpillableSubpartition
>> able
>> > to
>> > > > be
>> > > > > > read multiple times". Moreover should I mark this jira and
>> > FLINK-1713
>> > > > > > <https://issues.apache.org/jira/browse/FLINK-1713>
as blocking
>> for
>> > > the
>> > > > > > broadcast jira? What do you think?
>> > > > > >
>> > > > > > Best regards,
>> > > > > > Felix
>> > > > > >
>> > > > > > 2016-08-09 17:41 GMT+07:00 Till Rohrmann <trohrmann@apache.org
>> >:
>> > > > > >
>> > > > > > > Hi Felix,
>> > > > > > >
>> > > > > > > I'm not sure whether PipelinedSubpartition should be
readable
>> > more
>> > > > than
>> > > > > > > once because then it would effectively mean that we
>> materialize
>> > the
>> > > > > > > elements of the pipelined subpartition for stragglers.
>> > Therefore, I
>> > > > > think
>> > > > > > > that we should make blocking intermediate results readable
>> more
>> > > than
>> > > > > > once.
>> > > > > > > This will also be beneficial for interactive programs
where we
>> > > > continue
>> > > > > > > from the results of previous Flink jobs.
>> > > > > > >
>> > > > > > > It might also be interesting to have a blocking mode
which
>> > > schedules
>> > > > > its
>> > > > > > > consumers once the first result is there. Thus, having
a
>> mixture
>> > of
>> > > > > > > pipelined and blocking mode.
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > > Till
>> > > > > > >
>> > > > > > > On Tue, Aug 9, 2016 at 4:40 AM, Felix Neutatz <
>> > > > neutatz@googlemail.com>
>> > > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Stephan,
>> > > > > > > >
>> > > > > > > > I did some research about blocking intermediate
results. It
>> > turns
>> > > > out
>> > > > > > > that
>> > > > > > > > neither PipelinedSubpartition (see line 178) nor
blocking
>> > > > > intermediate
>> > > > > > > > results (see SpillableSubpartition line: 189)
can be read
>> > > multiple
>> > > > > > times.
>> > > > > > > > Moreover blocking intermediate results are currently
not
>> > > supported
>> > > > in
>> > > > > > > > native iterations (see https://issues.apache.org/
>> > > > > > jira/browse/FLINK-1713
>> > > > > > > ).
>> > > > > > > > So there are three ways to solve this:
>> > > > > > > > 1) We extend Pipelined subpartitions to make it
possible to
>> > read
>> > > > them
>> > > > > > > > multiple times
>> > > > > > > > 2) We extend Blocking subpartitions to make it
possible to
>> read
>> > > > them
>> > > > > > > > multiple times, but then we also have to fix FLINK-1713.
So
>> we
>> > > can
>> > > > > use
>> > > > > > > > broadcasts in native iterations
>> > > > > > > > 3) We create one pipelined subpartition for every
>> taskmanager.
>> > > > > Problem:
>> > > > > > > The
>> > > > > > > > more taskmanager there are, the more redundant
data we
>> store,
>> > but
>> > > > the
>> > > > > > > > network traffic stays optimal.
>> > > > > > > >
>> > > > > > > > Thank you for your help,
>> > > > > > > > Felix
>> > > > > > > >
>> > > > > > > > 2016-08-01 22:51 GMT+07:00 Stephan Ewen <sewen@apache.org>:
>> > > > > > > >
>> > > > > > > > > Hi Felix!
>> > > > > > > > >
>> > > > > > > > > Hope this helps_
>> > > > > > > > >
>> > > > > > > > > Concerning (1.1) - The producer does not
think in term of
>> > > number
>> > > > of
>> > > > > > > > target
>> > > > > > > > > TaskManagers. That number can, after all,
change in the
>> > > presence
>> > > > > of a
>> > > > > > > > > failure and recovery. The producer should,
for its own
>> > result,
>> > > > not
>> > > > > > care
>> > > > > > > > how
>> > > > > > > > > many consumers it will have (Tasks), but
produce it only
>> > once.
>> > > > > > > > >
>> > > > > > > > > Concerning (1.2)  - Only "blocking" intermediate
results
>> can
>> > be
>> > > > > > > consumed
>> > > > > > > > > multiple times. Data sent to broadcast variables
must
>> thus be
>> > > > > always
>> > > > > > a
>> > > > > > > > > blocking intermediate result.
>> > > > > > > > >
>> > > > > > > > > Greetings,
>> > > > > > > > > Stephan
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz
<
>> > > > > > > neutatz@googlemail.com>
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Stephan,
>> > > > > > > > > >
>> > > > > > > > > > thanks for the great ideas. First I
have some questions:
>> > > > > > > > > >
>> > > > > > > > > > 1.1) Does every task generate an intermediate
result
>> > > partition
>> > > > > for
>> > > > > > > > every
>> > > > > > > > > > target task or is that already implemented
in a way so
>> that
>> > > > there
>> > > > > > are
>> > > > > > > > > only
>> > > > > > > > > > as many intermediate result partitions
per task manager
>> as
>> > > > target
>> > > > > > > > tasks?
>> > > > > > > > > > (Example: There are 2 task managers
with 2 tasks each.
>> Do
>> > we
>> > > > get
>> > > > > 4
>> > > > > > > > > > intermediate result partitions per task
manager or do we
>> > get
>> > > > 8?)
>> > > > > > > > > > 1.2) How can I consume an intermediate
result partition
>> > > > multiple
>> > > > > > > times?
>> > > > > > > > > > When I tried that I got the following
exception:
>> > > > > > > > > > Caused by: java.lang.IllegalStateException:
>> Subpartition 0
>> > > of
>> > > > > > > > > > dbe284e3b37c1df1b993a3f0a6020ea6@
>> > > > ce9fc38f08a5cc9e93431a9cbf740d
>> > > > > cf
>> > > > > > is
>> > > > > > > > > being
>> > > > > > > > > > or already has been consumed, but pipelined
>> subpartitions
>> > can
>> > > > > only
>> > > > > > be
>> > > > > > > > > > consumed once.
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > PipelinedSubpartition.
>> > > > > > > > > createReadView(PipelinedSubpartition.java:179)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > PipelinedSubpartition.
>> > > > > > > > > createReadView(PipelinedSubpartition.java:36)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > ResultPartition.
>> > > > > > > > > createSubpartitionView(ResultPartition.java:348)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.partition.
>> > > > > > > ResultPartitionManager.
>> > > > > > > > > createSubpartitionView(ResultPartitionManager.java:81)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
>> > > > > > > > PartitionRequestServerHandler.
>> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:98)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > org.apache.flink.runtime.io.network.netty.
>> > > > > > > > PartitionRequestServerHandler.
>> > > > > > > > > channelRead0(PartitionRequestServerHandler.java:41)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > > io.netty.channel.SimpleChannel
>> InboundHandler.channelRead(
>> > > > > > > > > SimpleChannelInboundHandler.java:105)
>> > > > > > > > > >
>> > > > > > > > > > My status update: Since Friday I am
implementing your
>> idea
>> > > > > > described
>> > > > > > > in
>> > > > > > > > > > (2). Locally this approach already works
(for less than
>> 170
>> > > > > > > > iterations).
>> > > > > > > > > I
>> > > > > > > > > > will investigate further to solve that
issue.
>> > > > > > > > > >
>> > > > > > > > > > But I am still not sure how to implement
(1). Maybe we
>> > > > introduce
>> > > > > a
>> > > > > > > > > similar
>> > > > > > > > > > construct like the BroadcastVariableManager
to share the
>> > > > > > RecordWriter
>> > > > > > > > > among
>> > > > > > > > > > all tasks of a taskmanager. I am interested
in your
>> > thoughts
>> > > :)
>> > > > > > > > > >
>> > > > > > > > > > Best regards,
>> > > > > > > > > > Felix
>> > > > > > > > > >
>> > > > > > > > > > 2016-07-22 17:25 GMT+02:00 Stephan Ewen
<
>> sewen@apache.org
>> > >:
>> > > > > > > > > >
>> > > > > > > > > > > Hi Felix!
>> > > > > > > > > > >
>> > > > > > > > > > > Interesting suggestion. Here are
some thoughts on the
>> > > design.
>> > > > > > > > > > >
>> > > > > > > > > > > The two core changes needed to
send data once to the
>> > > > > TaskManagers
>> > > > > > > > are:
>> > > > > > > > > > >
>> > > > > > > > > > >   (1) Every sender needs to produce
its stuff once
>> > (rather
>> > > > than
>> > > > > > for
>> > > > > > > > > every
>> > > > > > > > > > > target task), there should not
be redundancy there.
>> > > > > > > > > > >   (2) Every TaskManager should
request the data once,
>> > other
>> > > > > tasks
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > > same TaskManager pick it up from
there.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > The current receiver-initialted
pull model is
>> actually a
>> > > good
>> > > > > > > > > abstraction
>> > > > > > > > > > > for that, I think.
>> > > > > > > > > > >
>> > > > > > > > > > > Lets look at (1):
>> > > > > > > > > > >
>> > > > > > > > > > >   - Currently, the TaskManagers
have a separate
>> > > intermediate
>> > > > > > result
>> > > > > > > > > > > partition for each target slot.
They should rather
>> have
>> > one
>> > > > > > > > > intermediate
>> > > > > > > > > > > result partition (saves also repeated
serialization)
>> that
>> > > is
>> > > > > > > consumed
>> > > > > > > > > > > multiple times.
>> > > > > > > > > > >
>> > > > > > > > > > >   - Since the results that are
to be broadcasted are
>> > always
>> > > > > > > > "blocking",
>> > > > > > > > > > > they can be consumed (pulled) 
multiples times.
>> > > > > > > > > > >
>> > > > > > > > > > > Lets look at (2):
>> > > > > > > > > > >
>> > > > > > > > > > >   - The current BroadcastVariableManager
has the
>> > > > functionality
>> > > > > to
>> > > > > > > let
>> > > > > > > > > the
>> > > > > > > > > > > first accessor of the BC-variable
materialize the
>> result.
>> > > > > > > > > > >
>> > > > > > > > > > >   - It could be changed such that
only the first
>> accessor
>> > > > > > creates a
>> > > > > > > > > > > RecordReader, so the others do
not even request the
>> > stream.
>> > > > > That
>> > > > > > > way,
>> > > > > > > > > the
>> > > > > > > > > > > TaskManager should pull only one
stream from each
>> > producing
>> > > > > task,
>> > > > > > > > which
>> > > > > > > > > > > means the data is transferred once.
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > That would also work perfectly
with the current
>> failure /
>> > > > > > recovery
>> > > > > > > > > model.
>> > > > > > > > > > >
>> > > > > > > > > > > What do you think?
>> > > > > > > > > > >
>> > > > > > > > > > > Stephan
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Jul 22, 2016 at 2:59 PM,
Felix Neutatz <
>> > > > > > > > neutatz@googlemail.com
>> > > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi everybody,
>> > > > > > > > > > > >
>> > > > > > > > > > > > I want to improve the performance
of broadcasts in
>> > Flink.
>> > > > > > > Therefore
>> > > > > > > > > > Till
>> > > > > > > > > > > > told me to start a FLIP on
this topic to discuss
>> how to
>> > > go
>> > > > > > > forward
>> > > > > > > > to
>> > > > > > > > > > > solve
>> > > > > > > > > > > > the current issues for broadcasts.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The problem in a nutshell:
Instead of sending data
>> to
>> > > each
>> > > > > > > > > taskmanager
>> > > > > > > > > > > only
>> > > > > > > > > > > > once, at the moment the data
is sent to each task.
>> This
>> > > > means
>> > > > > > if
>> > > > > > > > > there
>> > > > > > > > > > > are
>> > > > > > > > > > > > 3 slots on each taskmanager
we will send the data 3
>> > times
>> > > > > > instead
>> > > > > > > > of
>> > > > > > > > > > > once.
>> > > > > > > > > > > >
>> > > > > > > > > > > > There are multiple ways to
tackle this problem and I
>> > > > started
>> > > > > to
>> > > > > > > do
>> > > > > > > > > some
>> > > > > > > > > > > > research and investigate.
You can follow my thought
>> > > process
>> > > > > > here:
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
>> > > > > > > > > 5%3A+Only+send+data+to+each+ta
>> skmanager+once+for+broadcasts
>> > > > > > > > > > > >
>> > > > > > > > > > > > This is my first FLIP. So
please correct me, if I
>> did
>> > > > > something
>> > > > > > > > > wrong.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I am interested in your thoughts
about how to solve
>> > this
>> > > > > issue.
>> > > > > > > Do
>> > > > > > > > > you
>> > > > > > > > > > > > think my approach is heading
into the right
>> direction
>> > or
>> > > > > should
>> > > > > > > we
>> > > > > > > > > > > follow a
>> > > > > > > > > > > > totally different one.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I am happy about any comment
:)
>> > > > > > > > > > > >
>> > > > > > > > > > > > Best regards,
>> > > > > > > > > > > > Felix
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message