flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: TeraSort on Flink and Spark
Date Fri, 10 Jul 2015 08:33:22 GMT
Hi Dongwon Kim!

Thank you for trying out these changes.

The OptimizedText can be sorted more efficiently, because it generates a
binary key prefix. That way, the sorting needs to serialize/deserialize
less and saves on CPU.

In parts of the program, the CPU is then less of a bottleneck and the disks
and the network can unfold their bandwidth better.

Greetings,
Stephan



On Fri, Jul 10, 2015 at 9:35 AM, Dongwon Kim <eastcirclek@postech.ac.kr>
wrote:

> Hi Stephan,
>
> I just pushed changes to my github:
> https://github.com/eastcirclek/terasort.
> I've modified the TeraSort program so that (A) it can reuse objects
> and (B) it can exploit OptimizedText as you suggested.
>
> I've also conducted few experiments and the results are as follows:
> ORIGINAL : 1714
> ORIGINAL+A : 1671
> ORIGINAL+B : 1467
> ORIGINAL+A+B : 1427
> Your advice works as shown above :-)
>
> Datasets are now defined as below:
> - val inputFile = env.readHadoopFile(teraInputFormat, classOf[Text],
> classOf[Text], inputPath)
> - val optimizedText = inputFile.map(tp => (new OptimizedText(tp._1),
> tp._2))
> - val sortedPartitioned = optimizedText.partitionCustom(partitioner,
> 0).sortPartition(0, Order.ASCENDING)
> - sortedPartitioned.map(tp => (tp._1.getText, tp._2)).output(hadoopOF)
> You can see the two map transformations before and after the function
> composition partitionCustom.sortPartition.
>
> Here is a question regarding the performance improvement.
> Please see the attached Ganglia image files.
> - ORIGINAL-[cpu, disks, network].png are for ORIGINAL.
> - BEST-[cpu, disks, network].png are for ORIGINAL+A+B.
> Compared to ORIGINAL, BEST shows better utilization of disks and
> network and shows lower CPU utilization.
> Is this because OptimizedText objects are serialized into Flink memory
> layout?
> What happens when keys are represented in just Text, not
> OptimziedText? Are there another memory area to hold such objects? or
> are they serialized anyway but in an inefficient way?
> If latter, is the CPU utilization in ORIGINAL high because CPUs work
> hard to serialize Text objects using Java serialization mechanism with
> DataInput and DataOutput?
> If true, I can explain the high throughput of network and disks in
> ORIGINAL+A+B.
> I, however, observed the similar performance when I do mapping not
> only on 10-byte keys but also on 90-byte values, which cannot be
> explained by the above conjecture.
> Could you make things clear? If so, I would be very appreciated ;-)
>
> I'm also wondering whether the two map transformations,
> (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
> (Text, Text),
> can prevent Flink from performing a lot better.
> I don't have time to modify TeraInputFormat and TeraOutputFormat to
> read (String, String) pairs from HDFS and write (String, String) pairs
> to HDFS.
> Do you see that one can get a better TeraSort result using an new
> implementation of FileInputFormat<String,String>?
>
> Regards,
>
> Dongwon Kim
>
> 2015-07-03 3:29 GMT+09:00 Stephan Ewen <sewen@apache.org>:
> > Hello Dongwon Kim!
> >
> > Thanks you for sharing these numbers with us.
> >
> > I have gone through your implementation and there are two things you
> could
> > try:
> >
> > 1)
> >
> > I see that you sort Hadoop's Text data type with Flink. I think this may
> be
> > less efficient than if you sort String, or a Flink specific data type.
> >
> > For efficient byte operations on managed memory, Flink needs to
> understand
> > the binary representation of the data type. Flink understands that for
> > "String" and many other types, but not for "Text".
> >
> > There are two things you can do:
> >   - First, try what happens if you map the Hadoop Text type to a Java
> String
> > (only for the tera key).
> >   - Second, you can try what happens if you wrap the Hadoop Text type in
> a
> > Flink type that supports optimized binary sorting. I have pasted code for
> > that at the bottom of this email.
> >
> > 2)
> >
> > You can see if it helps performance if you enable object re-use in Flink.
> > You can do this on the ExecutionEnvironment via
> > "env.getConfig().enableObjectReuse()". Then Flink tries to use the same
> > objects repeatedly, in case they are mutable.
> >
> >
> > Can you try these options out and see how they affect Flink's runtime?
> >
> >
> > Greetings,
> > Stephan
> >
> > ---------------------------------------------------------
> > Code for optimized sortable (Java):
> >
> > public final class OptimizedText implements
> NormalizableKey<OptimizedText >
> > {
> > private final Text text;
> > public OptimizedText () {
> > this.text = new Text();
> > }
> > public OptimizedText (Text from) {
> > this.text = from;
> > }
> >
> > public Text getText() {
> > return text;
> > }
> >
> > @Override
> > public int getMaxNormalizedKeyLen() {
> > return 10;
> > }
> >
> > @Override
> > public void copyNormalizedKey(MemorySegment memory, int offset, int len)
> {
> > memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
> > Math.min(10, len)));
> > }
> >
> > @Override
> > public void write(DataOutputView out) throws IOException {
> > text.write(out);
> > }
> >
> > @Override
> > public void read(DataInputView in) throws IOException {
> > text.readFields(in);
> > }
> >
> > @Override
> > public int compareTo(OptimizedText o) {
> > return this.text.compareTo(o.text);
> > }
> > }
> >
> > ---------------------------------------------------------
> > Converting Text to OptimizedText (Java code)
> >
> > map(new MapFunction<Tuple2<Text, Text>, Tuple2<OptimizedText, Text>>()
{
> > @Override
> > public Tuple2<OptimizedText, Text> map(Tuple2<Text, Text> value) {
> > return new Tuple2<OptimizedText, Text>(new OptimizedText(value.f0),
> > value.f1);
> > }
> > })
> >
> >
> >
> >
> > On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim <eastcirclek@postech.ac.kr>
> > wrote:
> >>
> >> Hello,
> >>
> >> I'd like to share my code for TeraSort on Flink and Spark which uses
> >> the same range partitioner as Hadoop TeraSort:
> >> https://github.com/eastcirclek/terasort
> >>
> >> I also write a short report on it:
> >>
> >>
> http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
> >> In the blog post, I make a simple performance comparison between
> >> Flink, Spark, Tez, and MapReduce.
> >>
> >> I hope it will be helpful to you guys!
> >> Thanks.
> >>
> >> Dongwon Kim
> >> Postdoctoral Researcher @ Postech
> >
> >
>

Mime
View raw message