flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: DataSet: partitionByHash without materializing/spilling the entire partition?
Date Wed, 06 Sep 2017 13:09:54 GMT
Hi Billy,

 a program that is defined as

Dataset -> Map > Filter -> Map -> Output

should not spill at all.
There is an unnecessary serialization/deserialization step between the last
map and the sink, but there shouldn't be any spilling to disk.

As I said in my response to Urs, spilling should only happen in a few cases:

- full sort with not sufficient memory
- hash-tables that need to spill (only in join operators)
- range partitioning to compute a histogram of the partitioning keys.
- temp nodes to avoid deadlocks. These can occur in plans that branch and
join later like the following:

              /--- Map ---\
Input --<                   JOIN --- Output
              \--- Map ---/

The first two should not be surprising, but the last one is usually
unexpected.

Can you share a bit more information about your optimization of rewriting

        Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map -> Output

to

        Dataset -> Map -> FilterT -> CoGroup > Map -> Output
        Dataset -> Map -> FilterF -> Map -> Output

I did not completely understand the structure of the first job. Is it
branching and merging again?
Maybe you can share the JSON plan (ExecutionEnvironment.getExecutionPlan())?

Thanks, Fabian

2017-09-06 14:41 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> btw. not sure if you know that you can visualize the JSON plan returned by
> ExecutionEnvironment.getExecutionPlan() on the website [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/visualizer/
>
>
> 2017-09-06 14:39 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>
>> Hi Urs,
>>
>> a hash-partition operator should not spill. In general, DataSet plans aim
>> to be as much pipelined as possible.
>> There are a few cases when spilling happens:
>>
>> - full sort with not sufficient memory
>> - hash-tables that need to spill (only in join operators)
>> - range partitioning to compute a histogram of the partitioning keys.
>> - temp nodes to avoid deadlocks. These can occur in plans that branch and
>> join later like the following:
>>
>>               /--- Map ---\
>> Input --<                   JOIN --- Output
>>               \--- Map ---/
>>
>>
>> A simple plan without branching with as the one you posted
>>    readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>> first(n)
>> has no reason to spill except for the full sort that is required for the
>> final aggregation.
>>
>> Can you share the execution plan that you get of the plan
>> (ExecutionEnvironment.getExecutionPlan())?
>>
>> Btw, the sortGroup(0) call is superfluous because it would sort a group
>> where all 0-fields are the same on the 0-field.
>> I believe Flink's optimizer automatically removes that so it does not
>> impact the performance.
>> Sorting on another field would indeed make sense, because this would
>> determine order within a group and hence the records which are forwarded by
>> First(n).
>>
>> In order to force a combiner on a partitioned data set, you can do the
>> following:
>>
>> --------
>>
>> public static void main(String[] args) throws Exception {
>>
>>    ExecutionEnvironment env = ExecutionEnvironment.getExecut
>> ionEnvironment();
>>
>>    DataSet<Tuple2<Long, Long>> data = randData(env);
>>
>>    DataSet<Tuple2<Long, Long>> result = data.partitionByHash(0)
>>       .groupBy(0).combineGroup(new First3())
>>          .withForwardedFields("f0")
>>       .groupBy(0).reduceGroup(new First3());
>>
>>    result.print();
>> }
>>
>> public static class First3 implements
>>    GroupCombineFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>,
>>    GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
{
>>
>>    @Override
>>    public void combine(Iterable<Tuple2<Long, Long>> values,
>> Collector<Tuple2<Long, Long>> out) throws Exception {
>>       reduce(values, out);
>>    }
>>
>>    @Override
>>    public void reduce(Iterable<Tuple2<Long, Long>> values,
>> Collector<Tuple2<Long, Long>> out) throws Exception {
>>       int i = 0;
>>       for (Tuple2<Long, Long> v : values) {
>>          out.collect(v);
>>          i++;
>>          if (i == 3) {
>>             break;
>>          }
>>       }
>>    }
>> }
>>
>> --------
>>
>> The generated plan will
>> - hash partition the input data
>> - partially sort the data in memory on the first field (not going to disk)
>> - invoke the combiner for each in-memory sorted group
>> - locally forward the data (because of the forwarded field information
>> [1])
>> - fully sort the data
>> - invoke group reducer for each group
>>
>> In this plan, the only spilling should happen in the sort for the final
>> aggregation.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/batch/index.html#semantic-annotations
>>
>>
>>
>>
>> 2017-09-05 22:21 GMT+02:00 Newport, Billy <Billy.Newport@gs.com>:
>>
>>> We have the same issue. We are finding that we cannot express the data
>>> flow in a natural way because of unnecessary spilling. Instead, we're
>>> making our own operators which combine multiple steps together and
>>> essentially hide it from flink OR sometimes we even have to read an input
>>> dataset once per flow to avoid spilling. The performance improvements are
>>> dramatic but it's kind of reducing  flink to a thread scheduler rather than
>>> a data flow engine because we basically cannot express the flow to flink.
>>> This worries us because if we let others write flink code using our infra,
>>> we'll be spending all our time collapsing their flows into much simpler but
>>> less intuititve flows to prevent flink from spilling.
>>>
>>> This also means higher level APIs such as the table API or Beam are off
>>> the table because they prevent us optimizing in this manner.
>>>
>>> We already have prior implementations of the logic we are implementing
>>> in flink and as a result, we know it's much less efficient than the prior
>>> implementations which is giving us pause for rolling it out more broadly,
>>> we're afraid of the flink tax in effect from a performance point of view as
>>> well as from a usability point of view given naïve flows are not performant
>>> without significant collapsing.
>>>
>>> For example, we see spilling here:
>>>
>>>         Dataset -> Map > Filter -> Map -> Output
>>>
>>> We're trying to combine the Map ->Output into the filter operation now
>>> to write the records which are not passed through to an output file during
>>> the Filter.
>>>
>>>
>>> Or in this case
>>>
>>>         Dataset -> Map -> [FilterT -> CoGroup > ;FilterF] > Map
-> Output
>>>
>>> Rewriting as
>>>
>>>         Dataset -> Map -> FilterT -> CoGroup > Map -> Output
>>>         Dataset -> Map -> FilterF -> Map -> Output
>>>
>>> That is two separate flows is multiples faster. That is, reading the
>>> file twice rather than once.
>>>
>>> This is all pretty unintuitive and makes using Flink pretty difficult
>>> for us never mind our users. Writing the flink dataflows in a naïve way is
>>> fast but getting it to run with acceptable efficiency results in obscure
>>> workarounds and collapsing and takes the bulk of the time for us which is a
>>> shame and the main reason, we don't want to push it out for general use yet.
>>>
>>> It seems like it badly needs a flow rewriter which is capable of
>>> rewriting a naïve flow to use operators or restructured flows
>>> automatically. We're doing it by hand right now but there has to be a
>>> better way.
>>>
>>> It's a shame really, it's so close.
>>>
>>> Billy
>>>
>>>
>>> -----Original Message-----
>>> From: Urs Schoenenberger [mailto:urs.schoenenberger@tngtech.com]
>>> Sent: Tuesday, September 05, 2017 6:30 AM
>>> To: user
>>> Subject: DataSet: partitionByHash without materializing/spilling the
>>> entire partition?
>>>
>>> Hi all,
>>>
>>> we have a DataSet pipeline which reads CSV input data and then
>>> essentially does a combinable GroupReduce via first(n).
>>>
>>> In our first iteration (readCsvFile -> groupBy(0) -> sortGroup(0) ->
>>> first(n)), we got a jobgraph like this:
>>>
>>> source --[Forward]--> combine --[Hash Partition on 0, Sort]--> reduce
>>>
>>> This works, but we found the combine phase to be inefficient because not
>>> enough combinable elements fit into a sorter. My idea was to
>>> pre-partition the DataSet to increase the chance of combinable elements
>>> (readCsvFile -> partitionBy(0) -> groupBy(0) ->  sortGroup(0) ->
>>> first(n)).
>>>
>>> To my surprise, I found that this changed the job graph to
>>>
>>> source --[Hash Partition on 0]--> partition(noop) --[Forward]--> combine
>>> --[Hash Partition on 0, Sort]--> reduce
>>>
>>> while materializing and spilling the entire partitions at the
>>> partition(noop)-Operator!
>>>
>>> Is there any way I can partition the data on the way from source to
>>> combine without spilling? That is, can I get a job graph that looks like
>>>
>>>
>>> source --[Hash Partition on 0]--> combine --[Hash Partition on 0,
>>> Sort]--> reduce
>>>
>>> instead?
>>>
>>> Thanks,
>>> Urs
>>>
>>> --
>>> Urs Schönenberger - urs.schoenenberger@tngtech.com
>>>
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>
>>
>>
>

Mime
View raw message