flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dongwon Kim <eastcirc...@postech.ac.kr>
Subject Re: TeraSort on Flink and Spark
Date Fri, 10 Jul 2015 07:35:36 GMT
Hi Stephan,

I just pushed changes to my github: https://github.com/eastcirclek/terasort.
I've modified the TeraSort program so that (A) it can reuse objects
and (B) it can exploit OptimizedText as you suggested.

I've also conducted few experiments and the results are as follows:
ORIGINAL : 1714
ORIGINAL+A : 1671
ORIGINAL+B : 1467
ORIGINAL+A+B : 1427
Your advice works as shown above :-)

Datasets are now defined as below:
- val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
classOf[Text], inputPath)
- val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1), tp._2))
- val sortedPartitioned = optimizedText.partitionCustom(partitioner,
0).sortPartition(0, Order.ASCENDING)
- sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
You can see the two map transformations before and after the function
composition partitionCustom.sortPartition.

Here is a question regarding the performance improvement.
Please see the attached Ganglia image files.
- ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
- BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
Compared to ORIGINAL, BEST shows better utilization of disks and
network and shows lower CPU utilization.
Is this because OptimizedText objects are serialized into Flink memory layout?
What happens when keys are represented in just Text, not
OptimziedText? Are there another memory area to hold such objects? or
are they serialized anyway but in an inefficient way?
If latter, is the CPU utilization in ORIGINAL high because CPUs work
hard to serialize Text objects using Java serialization mechanism with
DataInput and DataOutput?
If true, I can explain the high throughput of network and disks in ORIGINAL+A+B.
I, however, observed the similar performance when I do mapping not
only on 10-byte keys but also on 90-byte values, which cannot be
explained by the above conjecture.
Could you make things clear? If so, I would be very appreciated ;-)

I'm also wondering whether the two map transformations,
(Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
(Text, Text),
can prevent Flink from performing a lot better.
I don't have time to modify TeraInputFormat and TeraOutputFormat to
read (String, String) pairs from HDFS and write (String, String) pairs
to HDFS.
Do you see that one can get a better TeraSort result using an new
implementation of FileInputFormat<String,String>?

Regards,

Dongwon Kim

2015-07-03 3:29 GMT+09:00 Stephan Ewen <sewen@apache.org>:
> Hello Dongwon Kim!
>
> Thanks you for sharing these numbers with us.
>
> I have gone through your implementation and there are two things you could
> try:
>
> 1)
>
> I see that you sort Hadoop's Text data type with Flink. I think this may be
> less efficient than if you sort String, or a Flink specific data type.
>
> For efficient byte operations on managed memory, Flink needs to understand
> the binary representation of the data type. Flink understands that for
> "String" and many other types, but not for "Text".
>
> There are two things you can do:
>   - First, try what happens if you map the Hadoop Text type to a Java String
> (only for the tera key).
>   - Second, you can try what happens if you wrap the Hadoop Text type in a
> Flink type that supports optimized binary sorting. I have pasted code for
> that at the bottom of this email.
>
> 2)
>
> You can see if it helps performance if you enable object re-use in Flink.
> You can do this on the ExecutionEnvironment via
> "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> objects repeatedly, in case they are mutable.
>
>
> Can you try these options out and see how they affect Flink's runtime?
>
>
> Greetings,
> Stephan
>
> ---------------------------------------------------------
> Code for optimized sortable (Java):
>
> public final class OptimizedText implements NormalizableKey<OptimizedText >
> {
> private final Text text;
> public OptimizedText () {
> this.text = new Text();
> }
> public OptimizedText (Text from) {
> this.text = from;
> }
>
> public Text getText() {
> return text;
> }
>
> @Override
> public int getMaxNormalizedKeyLen() {
> return 10;
> }
>
> @Override
> public void copyNormalizedKey(MemorySegment memory, int offset, int len) {
> memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> Math.min(10, len)));
> }
>
> @Override
> public void write(DataOutputView out) throws IOException {
> text.write(out);
> }
>
> @Override
> public void read(DataInputView in) throws IOException {
> text.readFields(in);
> }
>
> @Override
> public int compareTo(OptimizedText o) {
> return this.text.compareTo(o.text);
> }
> }
>
> ---------------------------------------------------------
> Converting Text to OptimizedText (Java code)
>
> map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>()
{
> @Override
> public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) {
> return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0),
> value.f1);
> }
> })
>
>
>
>
> On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirclek@postech.ac.kr>
> wrote:
>>
>> Hello,
>>
>> I'd like to share my code for TeraSort on Flink and Spark which uses
>> the same range partitioner as Hadoop TeraSort:
>> https://github.com/eastcirclek/terasort
>>
>> I also write a short report on it:
>>
>> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
>> In the blog post, I make a simple performance comparison between
>> Flink, Spark, Tez, and MapReduce.
>>
>> I hope it will be helpful to you guys!
>> Thanks.
>>
>> Dongwon Kim
>> Postdoctoral Researcher @ Postech
>
>

Mime
View raw message