Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C5D218D76 for ; Thu, 2 Jul 2015 19:15:49 +0000 (UTC) Received: (qmail 671 invoked by uid 500); 2 Jul 2015 19:15:49 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 601 invoked by uid 500); 2 Jul 2015 19:15:49 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 591 invoked by uid 99); 2 Jul 2015 19:15:49 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 19:15:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id DFDB9C0482 for ; Thu, 2 Jul 2015 19:15:48 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.001 X-Spam-Level: **** X-Spam-Status: No, score=4.001 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, KAM_LAZY_DOMAIN_SECURITY=1, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 3iuOJ2yFU8KG for ; Thu, 2 Jul 2015 19:15:37 +0000 (UTC) Received: from mail-yk0-f170.google.com (mail-yk0-f170.google.com [209.85.160.170]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 8D18B2122D for ; Thu, 2 Jul 2015 19:15:37 +0000 (UTC) Received: by ykfy125 with SMTP id y125so77307039ykf.1 for ; Thu, 02 Jul 2015 12:14:51 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:cc:content-type; bh=O/AJmn21HpRrYImrRSJnsVvEx3s4aC5o/1gJ6bxsbWs=; b=VsAyj5VxnOlO+kFb52R8C1IyJVkhQwyms9vBVvundOMIfjwmXhqcSbYBWjy6DkAEze HYfHXHxZbzAXuKe8Y5HKjpMK2tKmeS+Pc21U5NGF1RqRNpAjLowirtqjQuZ8dbDiDG1y 2lnvmb8Cutr/+CLMXgpgcDajY7XiRdrjVznsO+9rhR2Ok4gkaz0x4qwY5KLVEY5D1deV sIrnzwhbcadCp8ftlBXxHtHOkP/sbNRgpKxGXTak1T7m1s2Xn3xMJXEMrpwBIxwH7QtX xwWCFcm9RvE1upA/62rTjVnM50R+85a81NCJqsYWipYCR1pLBXMO8rMUHxXTuII1wJC1 ySsw== X-Gm-Message-State: ALoCoQk7xwaxAcgFwQYYk5TKkAaPeA2C8pAxLHIZC3WHan/YiJ5WePnn7P35PWR16GjEQi4ONHNi MIME-Version: 1.0 X-Received: by 10.129.137.4 with SMTP id z4mr37505182ywf.145.1435864491448; Thu, 02 Jul 2015 12:14:51 -0700 (PDT) Received: by 10.129.70.10 with HTTP; Thu, 2 Jul 2015 12:14:51 -0700 (PDT) X-Originating-IP: [37.227.147.165] Received: by 10.129.70.10 with HTTP; Thu, 2 Jul 2015 12:14:51 -0700 (PDT) In-Reply-To: References: Date: Thu, 2 Jul 2015 21:14:51 +0200 Message-ID: Subject: Re: TeraSort on Flink and Spark From: Flavio Pompermaier To: user Cc: eastcirclek@postech.ac.kr Content-Type: multipart/alternative; boundary=94eb2c06c100f58ca30519e94008 --94eb2c06c100f58ca30519e94008 Content-Type: text/plain; charset=UTF-8 Hi Stephan, if I understood correctly you are substituting the Text key with a more efficient version (OptimizedText). Just one question: you set max lenght of the key to 10..you know that a priori? This implementation of the key is much more efficient that just using String? Is there any comparison about that? Best, Flavio On 2 Jul 2015 20:29, "Stephan Ewen" wrote: > Hello Dongwon Kim! > > Thanks you for sharing these numbers with us. > > I have gone through your implementation and there are two things you could > try: > > 1) > > I see that you sort Hadoop's Text data type with Flink. I think this may > be less efficient than if you sort String, or a Flink specific data type. > > For efficient byte operations on managed memory, Flink needs to understand > the binary representation of the data type. Flink understands that for > "String" and many other types, but not for "Text". > > There are two things you can do: > - First, try what happens if you map the Hadoop Text type to a Java > String (only for the tera key). > - Second, you can try what happens if you wrap the Hadoop Text type in a > Flink type that supports optimized binary sorting. I have pasted code for > that at the bottom of this email. > > 2) > > You can see if it helps performance if you enable object re-use in Flink. > You can do this on the ExecutionEnvironment via > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same > objects repeatedly, in case they are mutable. > > > Can you try these options out and see how they affect Flink's runtime? > > > Greetings, > Stephan > > --------------------------------------------------------- > Code for optimized sortable (Java): > > public final class OptimizedText implements > NormalizableKey { > private final Text text; > public OptimizedText () { > this.text = new Text(); > } > public OptimizedText (Text from) { > this.text = from; > } > > public Text getText() { > return text; > } > > @Override > public int getMaxNormalizedKeyLen() { > return 10; > } > > @Override > public void copyNormalizedKey(MemorySegment memory, int offset, int len) { > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), > Math.min(10, len))); > } > > @Override > public void write(DataOutputView out) throws IOException { > text.write(out); > } > > @Override > public void read(DataInputView in) throws IOException { > text.readFields(in); > } > > @Override > public int compareTo(OptimizedText o) { > return this.text.compareTo(o.text); > } > } > > --------------------------------------------------------- > Converting Text to OptimizedText (Java code) > > map(new MapFunction, Tuple2>() { > @Override > public Tuple2 map(Tuple2 value) { > return new Tuple2(new OptimizedText(value.f0), > value.f1); > } > }) > > > > > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim > wrote: > >> Hello, >> >> I'd like to share my code for TeraSort on Flink and Spark which uses >> the same range partitioner as Hadoop TeraSort: >> https://github.com/eastcirclek/terasort >> >> I also write a short report on it: >> >> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html >> In the blog post, I make a simple performance comparison between >> Flink, Spark, Tez, and MapReduce. >> >> I hope it will be helpful to you guys! >> Thanks. >> >> Dongwon Kim >> Postdoctoral Researcher @ Postech >> > > --94eb2c06c100f58ca30519e94008 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable

Hi Stephan,
if I understood correctly you are substituting the Text key with a more eff= icient version (OptimizedText).
Just one question: you set max lenght of the key to 10..you know that a pri= ori?
This implementation of the key is much more efficient that just using Strin= g?
Is there any comparison about that?

Best,
Flavio

On 2 Jul 2015 20:29, "Stephan Ewen" &l= t;sewen@apache.org> wrote:
Hello Do= ngwon Kim!

Thanks you for sharing these numbers with= us.

I have gone through your implementation and t= here are two things you could try:

1)=C2=A0

I see that you sort Hadoop's Text data type with= Flink. I think this may be less efficient than if you sort String, or a Fl= ink specific data type.

For efficient byte operati= ons on managed memory, Flink needs to understand the binary representation = of the data type. Flink understands that for "String" and many ot= her types, but not for "Text".

There are= two things you can do:
=C2=A0 - First, try what happens if you m= ap the Hadoop Text type to a Java String (only for the tera key).
=C2=A0 - Second, you can try what happens if you wrap the Hadoop Text type= in a Flink type that supports optimized binary sorting. I have pasted code= for that at the bottom of this email.

2)

You can see if it helps performance if you enable obje= ct re-use in Flink. You can do this on the ExecutionEnvironment via "e= nv.getConfig().enableObjectReuse()". Then Flink tries to use the same = objects repeatedly, in case they are mutable.


=
Can you try these options out and see how they affect Flink'= s runtime?


Greetings,
Ste= phan

---------------------------------------------= ------------
Code for optimized sortable (Java):
public final class OptimizedText implements NormalizableK= ey<OptimizedText=C2=A0> {
private= final Text text;
<= /div>
public=C2=A0Optimize= dText=C2=A0() {
thi= s.text =3D new Text();
}
public=C2=A0OptimizedText=C2=A0(T= ext from) {
this.te= xt =3D from;
}
=

public Tex= t getText() {
retur= n text;
}
=
@Override
=
public int getMaxNormaliz= edKeyLen() {
return= 10;
}
@Override
public void copyNormalizedKe= y(MemorySegment memory, int offset, int len) {
memory.put(offset, text.getBytes(), 0, Math.min= (text.getLength(), Math.min(10, len)));
}

@Override
= public void write(DataOutputView out) throws IOException {
text.write(out);
<= span style=3D"white-space:pre-wrap"> }

@Override
public void read(DataInputView in) throws IO= Exception {
text.re= adFields(in);
}

@Override=
public int compareT= o(OptimizedText=C2=A0o) {
= return this.text.compareTo(o.text);
}
}

-------= --------------------------------------------------
Converting= Text to OptimizedText (Java code)

map(new Ma= pFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>= ;() {
@Override
public Tuple2<OptimizedText, Text>= ; map(Tuple2<Text, Text> value) {
return new Tuple2<OptimizedText, Text>(new Optim= izedText(value.f0), value.f1);
}
})



On T= hu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirclek@postech.ac.kr= > wrote:
Hello,

I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort:
https://github.com/eastcirclek/terasort

I also write a short report on it:
http://eastcircle= k.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
In the blog post, I make a simple performance comparison between
Flink, Spark, Tez, and MapReduce.

I hope it will be helpful to you guys!
Thanks.

Dongwon Kim
Postdoctoral Researcher @ Postech

--94eb2c06c100f58ca30519e94008--