flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Order groups by their keys
Date Wed, 15 Jul 2015 13:30:54 GMT
Hi Robert,

there are two issues involved here.

1) Flink does not support totally ordered paralllel output out-of-the box.
Fully sorting data in parallel requires range partitioning which requires
some knowledge of the data (distribution of the key values) to produce
balanced partitions. Flink does not feature statistics collection to
determine the key distribution that's why it does not offer a range
partition operation yet. However, Flink supports to sort local partitions
and custom partitioners. If you know the value distribution of the key, you
can implement a custom partitioner and locally sort the partitions to
obtain a fully sorted result.

2) print() collects all partitions in arbitrary order such that any order
across partitions is destroyed (the order within partitions should not be
affected).

Best, Fabian

2015-07-15 14:56 GMT+02:00 Robert Schmidtke <ro.schmidtke@gmail.com>:

> Hey everyone,
>
> I'm currently trying to implement TPC-H Q1 and that involves ordering of
> results. Now I'm not too familiar with the transformations yet, however for
> the life of me I cannot figure out how to get it to work. Consider the
> following toy example:
>
> final ExecutionEnvironment env = ExecutionEnvironment
> .getExecutionEnvironment();
> DataSet<Tuple3<String, Integer, Integer>> elems = env.fromElements(
> new Tuple3<String, Integer, Integer>("a", 2, 1),
> new Tuple3<String, Integer, Integer>("b", 1, 2),
> new Tuple3<String, Integer, Integer>("a", 1, 3),
> new Tuple3<String, Integer, Integer>("b", 1, 4),
> new Tuple3<String, Integer, Integer>("a", 1, 5),
> new Tuple3<String, Integer, Integer>("b", 2, 6),
> new Tuple3<String, Integer, Integer>("a", 2, 7),
> new Tuple3<String, Integer, Integer>("b", 2, 8));
> elems.groupBy(0, 1).sum(2).print();
>
> I want the output to be:
> (a,1,8)
> (a,2,8)
> (b,1,6)
> (b,2,14)
>
> However the output is:
> (a,2,8)
> (b,1,6)
> (b,2,14)
> (a,1,8)
>
> No matter where I place sorting of partitions or groups transformations
> (strange enough I just realized that when I don't add any ordering, the
> output is as expected; however this is just the case in the toy example and
> not in my TPC-H Q1). Is it currently not possible to achieve an ordered
> output in this case? Please bear with me if I overlooked the obvious, but I
> could not get a clear picture from the documentation.
>
> Btw. the code is right here:
> https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH1Benchmark.java#L137
> I verified the results with the provided data from TPC-H, apart from the
> sorting everything is fine.
>
> Thanks a bunch in advance,
>
> Cheers
> Robert
>
> --
> My GPG Key ID: 336E2680
>

Mime
View raw message