flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Neutatz <neut...@googlemail.com>
Subject Re: [DISCUSS] FLIP-5 Only send data to each taskmanager once for broadcasts
Date Tue, 09 Aug 2016 02:40:06 GMT
Hi Stephan,

I did some research about blocking intermediate results. It turns out that
neither PipelinedSubpartition (see line 178) nor blocking intermediate
results (see SpillableSubpartition line: 189) can be read multiple times.
Moreover blocking intermediate results are currently not supported in
native iterations (see https://issues.apache.org/jira/browse/FLINK-1713 ).
So there are three ways to solve this:
1) We extend Pipelined subpartitions to make it possible to read them
multiple times
2) We extend Blocking subpartitions to make it possible to read them
multiple times, but then we also have to fix FLINK-1713. So we can use
broadcasts in native iterations
3) We create one pipelined subpartition for every taskmanager. Problem: The
more taskmanager there are, the more redundant data we store, but the
network traffic stays optimal.

Thank you for your help,
Felix

2016-08-01 22:51 GMT+07:00 Stephan Ewen <sewen@apache.org>:

> Hi Felix!
>
> Hope this helps_
>
> Concerning (1.1) - The producer does not think in term of number of target
> TaskManagers. That number can, after all, change in the presence of a
> failure and recovery. The producer should, for its own result, not care how
> many consumers it will have (Tasks), but produce it only once.
>
> Concerning (1.2)  - Only "blocking" intermediate results can be consumed
> multiple times. Data sent to broadcast variables must thus be always a
> blocking intermediate result.
>
> Greetings,
> Stephan
>
>
> On Wed, Jul 27, 2016 at 11:33 AM, Felix Neutatz <neutatz@googlemail.com>
> wrote:
>
> > Hi Stephan,
> >
> > thanks for the great ideas. First I have some questions:
> >
> > 1.1) Does every task generate an intermediate result partition for every
> > target task or is that already implemented in a way so that there are
> only
> > as many intermediate result partitions per task manager as target tasks?
> > (Example: There are 2 task managers with 2 tasks each. Do we get 4
> > intermediate result partitions per task manager or do we get 8?)
> > 1.2) How can I consume an intermediate result partition multiple times?
> > When I tried that I got the following exception:
> > Caused by: java.lang.IllegalStateException: Subpartition 0 of
> > dbe284e3b37c1df1b993a3f0a6020ea6@ce9fc38f08a5cc9e93431a9cbf740dcf is
> being
> > or already has been consumed, but pipelined subpartitions can only be
> > consumed once.
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:179)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.
> createReadView(PipelinedSubpartition.java:36)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartition.
> createSubpartitionView(ResultPartition.java:348)
> > at
> >
> > org.apache.flink.runtime.io.network.partition.ResultPartitionManager.
> createSubpartitionView(ResultPartitionManager.java:81)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:98)
> > at
> >
> > org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler.
> channelRead0(PartitionRequestServerHandler.java:41)
> > at
> >
> > io.netty.channel.SimpleChannelInboundHandler.channelRead(
> SimpleChannelInboundHandler.java:105)
> >
> > My status update: Since Friday I am implementing your idea described in
> > (2). Locally this approach already works (for less than 170 iterations).
> I
> > will investigate further to solve that issue.
> >
> > But I am still not sure how to implement (1). Maybe we introduce a
> similar
> > construct like the BroadcastVariableManager to share the RecordWriter
> among
> > all tasks of a taskmanager. I am interested in your thoughts :)
> >
> > Best regards,
> > Felix
> >
> > 2016-07-22 17:25 GMT+02:00 Stephan Ewen <sewen@apache.org>:
> >
> > > Hi Felix!
> > >
> > > Interesting suggestion. Here are some thoughts on the design.
> > >
> > > The two core changes needed to send data once to the TaskManagers are:
> > >
> > >   (1) Every sender needs to produce its stuff once (rather than for
> every
> > > target task), there should not be redundancy there.
> > >   (2) Every TaskManager should request the data once, other tasks in
> the
> > > same TaskManager pick it up from there.
> > >
> > >
> > > The current receiver-initialted pull model is actually a good
> abstraction
> > > for that, I think.
> > >
> > > Lets look at (1):
> > >
> > >   - Currently, the TaskManagers have a separate intermediate result
> > > partition for each target slot. They should rather have one
> intermediate
> > > result partition (saves also repeated serialization) that is consumed
> > > multiple times.
> > >
> > >   - Since the results that are to be broadcasted are always "blocking",
> > > they can be consumed (pulled)  multiples times.
> > >
> > > Lets look at (2):
> > >
> > >   - The current BroadcastVariableManager has the functionality to let
> the
> > > first accessor of the BC-variable materialize the result.
> > >
> > >   - It could be changed such that only the first accessor creates a
> > > RecordReader, so the others do not even request the stream. That way,
> the
> > > TaskManager should pull only one stream from each producing task, which
> > > means the data is transferred once.
> > >
> > >
> > > That would also work perfectly with the current failure / recovery
> model.
> > >
> > > What do you think?
> > >
> > > Stephan
> > >
> > >
> > > On Fri, Jul 22, 2016 at 2:59 PM, Felix Neutatz <neutatz@googlemail.com
> >
> > > wrote:
> > >
> > > > Hi everybody,
> > > >
> > > > I want to improve the performance of broadcasts in Flink. Therefore
> > Till
> > > > told me to start a FLIP on this topic to discuss how to go forward to
> > > solve
> > > > the current issues for broadcasts.
> > > >
> > > > The problem in a nutshell: Instead of sending data to each
> taskmanager
> > > only
> > > > once, at the moment the data is sent to each task. This means if
> there
> > > are
> > > > 3 slots on each taskmanager we will send the data 3 times instead of
> > > once.
> > > >
> > > > There are multiple ways to tackle this problem and I started to do
> some
> > > > research and investigate. You can follow my thought process here:
> > > >
> > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 5%3A+Only+send+data+to+each+taskmanager+once+for+broadcasts
> > > >
> > > > This is my first FLIP. So please correct me, if I did something
> wrong.
> > > >
> > > > I am interested in your thoughts about how to solve this issue. Do
> you
> > > > think my approach is heading into the right direction or should we
> > > follow a
> > > > totally different one.
> > > >
> > > > I am happy about any comment :)
> > > >
> > > > Best regards,
> > > > Felix
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message