Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF3A318A98 for ; Fri, 5 Jun 2015 17:48:55 +0000 (UTC) Received: (qmail 58139 invoked by uid 500); 5 Jun 2015 17:48:55 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 58070 invoked by uid 500); 5 Jun 2015 17:48:55 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 58060 invoked by uid 99); 5 Jun 2015 17:48:55 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jun 2015 17:48:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1377E1821FA for ; Fri, 5 Jun 2015 17:48:55 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3 X-Spam-Level: *** X-Spam-Status: No, score=3 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HEADER_FROM_DIFFERENT_DOMAINS=0.001, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 3Zekp-IcKnuT for ; Fri, 5 Jun 2015 17:48:49 +0000 (UTC) Received: from mail-ig0-f172.google.com (mail-ig0-f172.google.com [209.85.213.172]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 4482243ACE for ; Fri, 5 Jun 2015 17:48:49 +0000 (UTC) Received: by igbzc4 with SMTP id zc4so21440792igb.0 for ; Fri, 05 Jun 2015 10:48:48 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:sender:in-reply-to:references:date:message-id:subject :from:to:content-type; bh=wbu6BrX77rCg/RAKKN8pqrDqKQxPqGOkgAUgrl7KaUc=; b=Lb57IczyXY64Xa8lsnYI9s0XFDzUB58VhVOhQlXh4JhskoSXqnKmev7CJo+Crc/Ioj gYEcu8ExSLBLXIDqNsB5Hlmyf/D1/e43i8QdeKtW4uK0sYdvoOYvZSeqBEzSZrnMZqVf DT1bL0JWMUBdgNE4d7TDynvk5OEb1Agb4XxVuVcEeOZIpELooK2hvXo6ZFbl8Hmo+GKf Fmh9A5866Ub0Y0bdxxzCatBw92VvdB/yYki64ibzFKjr46IxC8w+UHIPi4qNRujPJSXP 68++GI7Z/pMOgh8mdhu+Al0Gy79m+e/5VI9ZXVrzC89mlZTAwkKpTpIswKkWPEUTqLiZ fOHA== MIME-Version: 1.0 X-Received: by 10.43.140.5 with SMTP id iy5mr11950837icc.77.1433526528739; Fri, 05 Jun 2015 10:48:48 -0700 (PDT) Sender: ewenstephan@gmail.com Received: by 10.64.241.230 with HTTP; Fri, 5 Jun 2015 10:48:48 -0700 (PDT) In-Reply-To: References: Date: Fri, 5 Jun 2015 19:48:48 +0200 X-Google-Sender-Auth: X0QEPW2edq_Ee-W6uRE3PknCWms Message-ID: Subject: Re: scaling flink From: Stephan Ewen To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11c2c5888592390517c8e740 --001a11c2c5888592390517c8e740 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable It was supposed to mean "please PING us" ;-) On Fri, Jun 5, 2015 at 7:21 PM, Stephan Ewen wrote: > Hi Bill! > > For the WordCount case, these numbers are not unexpected. Flink does not > yet use a hash aggregator for the "reduce(v1, v2)" call, but uses a > sort-based aggregation for that. Flink's sort aggregations are very > reliable and very scalable compared to many hash aggregations, but often > more expensive. Especially on low-key-cardinality data sets, hash > aggregations outperform sort aggregations. > > It is on the roadmap to add a managed-memory hash aggregator that is > reliable. For now, Flink's runtime has managed memory sorts and hash-join= s, > so we stuck with the reliability over the performance. > > It is cool to see that you are doing an evaluation and we are very curiou= s > about your outcomes. Let us now please how it looks for other operations > and patterns, like joins, iterations, ... > > > > Concerning performance tuning, here are a few pointers that may be > interesting: > > - You are using a lot of very small TaskManagers, each with one slot. I= t > will most likely be faster if you use fewer TaskManagers with more slots, > because then the network stack is shared between more tasks. This results > in fewer TCP connections, which each carry more data. You could try "-yn > $((111)) -ytm $((24*1024)) -yD taskmanager.numberOfTaskSlots=3D$((6))" fo= r > example. > > - The example word-count implementation is not particularly tuned, I > think one can do better there. > > - Flink has a mode to reuse objects, which takes a bit of pressure from > the garbage collector. Where objects are not cached by the user code, thi= s > may help reduce pressure that user code imposes on the GarbageCollector. > > > BTW: Are you including the YARN startup time, or are you measuring from > when the program execution starts? > > > Please pig us if you have more questions! > > > Greetings, > Stephan > > > On Fri, Jun 5, 2015 at 5:16 PM, Bill Sparks wrote: > >> Hi. >> >> I'm running some comparisons between flink, MRv2, and spark(1.3), using >> the new Intel HiBench suite. I've started with the stock workcount examp= le >> and I'm seeing some numbers which are not where I thought I'd be. >> >> So the question I have is what the the configuration parameters which >> can affect the performance? Is there a performance/tuning guide. >> >> What we have =E2=80=93 hardware wise are 48 Haswell/32 physical/64 HT c= ores >> with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using the >> following parameters. >> >> ./bin/flink run -m yarn-cluster \ >> -yD fs.overwrite-files=3Dtrue \ >> -yD fs.output.always-create-directory=3Dtrue \ >> -yq \ >> -yn $((666)) \ >> -yD taskmanager.numberOfTaskSlots=3D$((1)) \ >> -yD parallelization.degree.default=3D$((666)) \ >> -ytm $((4*1024)) \ >> -yjm $((4*1024)) \ >> ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar \ >> hdfs:///user/jsparks/HiBench/Wordcount/Input \ >> hdfs:///user/jsparks/HiBench/Wordcount/Output >> >> Any pointers would be greatly appreciated. >> >> Type Date Time Input_data_size Duration(s= ) Throughput(bytes/s) Throughput/node >> HadoopWordcount 2015-06-03 10:45:11 2052360935068 763.106 = 2689483420 2689483420 >> JavaSparkWordcount 2015-06-03 10:55:24 2052360935068 411.246 = 4990591847 4990591847 >> ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068 342.777 = 5987452294 5987452294 >> >> Type Date Time Input_data_size Duration(s)= Throughput(bytes/s) Throughput/node >> flinkWordCount 2015-06-04 16:27:27 2052360935068 647.383 = 3170242244 66046713 >> >> >> >> -- >> Jonathan (Bill) Sparks >> Software Architecture >> Cray Inc. >> > > --001a11c2c5888592390517c8e740 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
It was supposed to mean "please PING us" ;-)

On Fri, Jun 5, = 2015 at 7:21 PM, Stephan Ewen <sewen@apache.org> wrote:
Hi Bill!

For the WordCount case, these numbers are not unexpected. Flink does not= yet use a hash aggregator for the "reduce(v1, v2)" call, but use= s a sort-based aggregation for that. Flink's sort aggregations are very= reliable and very scalable compared to many hash aggregations, but often m= ore expensive. Especially on low-key-cardinality data sets, hash aggregatio= ns outperform sort aggregations.

It is on the road= map to add a managed-memory hash aggregator that is reliable. For now, Flin= k's runtime has managed memory sorts and hash-joins, so we stuck with t= he reliability over the performance.

It is cool to= see that you are doing an evaluation and we are very curious about your ou= tcomes. Let us now please how it looks for other operations and patterns, l= ike joins, iterations, ...



Concerning performance tuning, here are a few pointers that may be i= nteresting:

=C2=A0 - You are using a lot of very s= mall TaskManagers, each with one slot. It will most likely be faster if you= use fewer TaskManagers with more slots, because then the network stack is = shared between more tasks. This results in fewer TCP connections, which eac= h carry more data. You could try=C2=A0"-yn $((111)) -ytm $((24*1024)) = -yD taskmanager.numberOfTaskSlots=3D$((6))" for example.
=C2=A0 - The example word-count implementation is not particula= rly tuned, I think one can do better there.

= =C2=A0 - Flink has a mode to reuse objects, which takes a bit of pressure f= rom the garbage collector. Where objects are not cached by the user code, t= his may help reduce pressure that user code imposes on the GarbageCollector= .


BTW: Are you including the YA= RN startup time, or are you measuring from when the program execution start= s?


Please pig us if you have more q= uestions!


Greetings,
Step= han


On Fri, Jun 5, 2015 at= 5:16 PM, Bill Sparks <jsparks@cray.com> wrote:
Hi.

I'm running some comparisons between flink, MRv2, and spark(1.3), = using the new Intel HiBench suite. I've started with the stock workcoun= t example and I'm seeing some numbers which are not where I thought I&#= 39;d be.

So the question I have is what the the configuration parameters which = can affect the performance? Is there a performance/tuning guide.

What we have =E2=80=93 hardware wise are 48 Haswell/32 physical/64 HT = cores with 128 GB, FDR connect nodes. I'm parsing 2TB of text, using th= e following parameters.

./bin/flink run -m yarn-cluster \
-yD fs.overwrite-files=3Dt= rue \
-yD fs.output.always-creat= e-directory=3Dtrue \
-yq \
-yn $((666)) \
-yD taskmanager.numberOfTa= skSlots=3D$((1)) \
-yD parallelization.degree= .default=3D$((666)) \
-ytm $((4*1024)) \
-yjm $((4*1024)) \
./examples/flink-java-exam= ples-0.9-SNAPSHOT-WordCount.jar \
hdfs:///user/jsparks/HiBen= ch/Wordcount/Input \
hdfs:///user/jsparks/HiBen= ch/Wordcount/Output

Any pointers would be greatly appreciated.

Type =C2=A0 =C2=A0 =C2=A0 =C2=A0     =
   Date =C2=A0 =C2=A0 =C2=A0 Time =C2=A0 =C2=A0 Input_data_size =C2=A0 =C2=
=A0 =C2=A0Duration(s) =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0Throughput(bytes/s)=
 =C2=A0Throughput/node =C2=A0 =C2=A0=C2=A0
HadoopWordcount 2015-06-0= 3 10:45:11 2052360935068 =C2=A0 =C2=A0 =C2=A0 =C2=A0763.106 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A02689483420 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 2689483420=C2=A0
JavaSparkWordcount 2015-06-03 10:55:24 20523609= 35068 =C2=A0 =C2=A0 =C2=A0 =C2=A0411.246 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A04990591847 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 4990591847=C2= =A0
ScalaSparkWordcount 2015-06-03 11:06:24 2052360935068 =C2=A0 =C2=A0 = =C2=A0 =C2=A0342.777 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0598745= 2294 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 5987452294 =C2=A0=C2=A0=C2=A0
Type                Date       Time  =
   Input_data_size      Duration(s)          Throughput(bytes/s)  Throughpu=
t/node 
flinkWordCount 2015-06-04 16:27:27 2052360935068 647= .383 3170242244 66046713

=C2=A0
--=C2=A0
Jonathan (Bill) Sparks
Software Architecture
Cray Inc.


--001a11c2c5888592390517c8e740--