Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 84833106B0 for ; Wed, 30 Apr 2014 07:39:33 +0000 (UTC) Received: (qmail 5784 invoked by uid 500); 30 Apr 2014 07:39:31 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 5764 invoked by uid 500); 30 Apr 2014 07:39:31 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@spark.apache.org Delivered-To: mailing list user@spark.apache.org Received: (qmail 5755 invoked by uid 99); 30 Apr 2014 07:39:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Apr 2014 07:39:31 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of daniel.darabos@lynxanalytics.com designates 209.85.216.48 as permitted sender) Received: from [209.85.216.48] (HELO mail-qa0-f48.google.com) (209.85.216.48) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Apr 2014 07:39:26 +0000 Received: by mail-qa0-f48.google.com with SMTP id j15so1280174qaq.21 for ; Wed, 30 Apr 2014 00:39:03 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to:content-type; bh=y6lzbWn9O+0ja7/lB0o3gR4eNrgCxm9dh778qHgEU1g=; b=RyDLnp9rU/jmgOSr82S0JlS7NhdZvichQSHYUGSa5YDZJbIOme+kvlDDDkqqVbFVqX eqB841zeW/dJakr5UHlT2IAwZSgRlS1G1VfkVXeXwSfeZXgmy2MOcK/ZLrFSY0DNMFF/ jeosN0cOS6C5EwoWioXzdwbLcSD5L1XoA0hL9IprDoQgnc+S1p/wC9XVi96guxzCD3U2 xnHaYb2Dr+gaokG6yJieYTdKPm5RnuhzhDWzCi/6hJnPMazifvUtYWN8alxn0hmnxaJx afsD3GwoXYDjZKoE0u9vjsF9145wgojJ8n0YI6SYzyDnfkP4hzP7kPxe34B6NfKd4RQm YE6Q== X-Gm-Message-State: ALoCoQl38Es/GYzXS+yQ3QorIBwmzifG/ZPK7yTFH3WCTFkPINKLUTN/SFgey8RPREbUDHOlABdv MIME-Version: 1.0 X-Received: by 10.224.92.134 with SMTP id r6mr2952937qam.82.1398843543528; Wed, 30 Apr 2014 00:39:03 -0700 (PDT) Received: by 10.96.101.36 with HTTP; Wed, 30 Apr 2014 00:39:03 -0700 (PDT) In-Reply-To: <391D65D0EBFC9B4B95E117F72A360F1A0123E2E9@SHSMSX101.ccr.corp.intel.com> References: <391D65D0EBFC9B4B95E117F72A360F1A0123C212@SHSMSX101.ccr.corp.intel.com> <391D65D0EBFC9B4B95E117F72A360F1A0123D43C@SHSMSX101.ccr.corp.intel.com> <391D65D0EBFC9B4B95E117F72A360F1A0123E2E9@SHSMSX101.ccr.corp.intel.com> Date: Wed, 30 Apr 2014 09:39:03 +0200 Message-ID: Subject: Re: Shuffle Spill Issue From: Daniel Darabos To: user@spark.apache.org Content-Type: multipart/alternative; boundary=089e0149d0fa82383504f83da4e7 X-Virus-Checked: Checked by ClamAV on apache.org --089e0149d0fa82383504f83da4e7 Content-Type: text/plain; charset=UTF-8 Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey just calls combineByKey: def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } (I think I confused reduceByKey with groupByKey.) On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond wrote: > Hi Daniel > > Thanks for your reply, While I think for reduceByKey, it will also > do map side combine, thus extra the result is the same, say, for each > partition, one entry per distinct word. In my case with javaserializer, > 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( > memory ) is abnormal, and sounds to me should not trigger at all. And, by > the way, this behavior only occurs in map out side, on reduce / shuffle > fetch side, this strange behavior won't happen. > > Best Regards, > Raymond Liu > > From: Daniel Darabos [mailto:daniel.darabos@lynxanalytics.com] > > I have no idea why shuffle spill is so large. But this might make it > smaller: > > val addition = (a: Int, b: Int) => a + b > val wordsCount = wordsPair.combineByKey(identity, addition, addition) > > This way only one entry per distinct word will end up in the shuffle for > each partition, instead of one entry per word occurrence. > > On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond > wrote: > Hi Patrick > > I am just doing simple word count , the data is generated by > hadoop random text writer. > > This seems to me not quite related to compress , If I turn off > compress on shuffle, the metrics is something like below for the smaller > 240MB Dataset. > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 10 sr437:48527 35 s 8 0 8 0.0 B 2.5 MB > 2.2 GB 1291.2 KB > 12 sr437:46077 34 s 8 0 8 0.0 B 2.5 MB > 1822.6 MB 1073.3 KB > 13 sr434:37896 31 s 8 0 8 0.0 B 2.4 MB > 1099.2 MB 621.2 KB > 15 sr438:52819 31 s 8 0 8 0.0 B 2.5 MB > 1898.8 MB 1072.6 KB > 16 sr434:37103 32 s 8 0 8 0.0 B 2.4 MB > 1638.0 MB 1044.6 KB > > > And the program pretty simple: > > val files = sc.textFile(args(1)) > val words = files.flatMap(_.split(" ")) > val wordsPair = words.map(x => (x, 1)) > > val wordsCount = wordsPair.reduceByKey(_ + _) > val count = wordsCount.count() > > println("Number of words = " + count) > > > Best Regards, > Raymond Liu > > From: Patrick Wendell [mailto:pwendell@gmail.com] > > Could you explain more what your job is doing and what data types you are > using? These numbers alone don't necessarily indicate something is wrong. > The relationship between the in-memory and on-disk shuffle amount is > definitely a bit strange, the data gets compressed when written to disk, > but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it > to compress _that_ much. > > On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond > wrote: > Hi > > > I am running a simple word count program on spark standalone > cluster. The cluster is made up of 6 node, each run 4 worker and each > worker own 10G memory and 16 core thus total 96 core and 240G memory. ( > well, also used to configed as 1 worker with 40G memory on each node ) > > I run a very small data set (2.4GB on HDFS on total) to confirm > the problem here as below: > > As you can read from part of the task metrics as below, I noticed > that the shuffle spill part of metrics indicate that there are something > wrong. > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:42139 29 s 4 0 4 0.0 B 4.3 MB > 23.6 GB 4.3 MB > 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB > 19.0 GB 3.4 MB > 10 sr436:53277 26 s 4 0 4 0.0 B 4.3 MB > 25.6 GB 4.6 MB > 11 sr437:58872 32 s 4 0 4 0.0 B 4.3 MB > 25.0 GB 4.4 MB > 12 sr435:48358 27 s 4 0 4 0.0 B 4.3 MB > 25.1 GB 4.4 MB > > > You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x > of the actual shuffle data and Shuffle Spill (Disk), and also it seems to > me that by no means that the spill should trigger, since the memory is not > used up at all. > > To verify that I further reduce the data size to 240MB on total > > And here is the result: > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:50895 15 s 4 0 4 0.0 B 703.0 KB > 80.0 MB 43.2 KB > 1 sr433:50207 17 s 4 0 4 0.0 B 704.7 KB > 389.5 MB 90.2 KB > 10 sr436:56352 16 s 4 0 4 0.0 B 700.9 KB > 814.9 MB 181.6 KB > 11 sr437:53099 15 s 4 0 4 0.0 B 689.7 KB > 0.0 B 0.0 B > 12 sr435:48318 15 s 4 0 4 0.0 B 702.1 KB > 427.4 MB 90.7 KB > 13 sr433:59294 17 s 4 0 4 0.0 B 704.8 KB > 779.9 MB 180.3 KB > > Nothing prevent spill from happening. > > Now, there seems to me that there must be something wrong with the spill > trigger codes. > > So anyone encounter this issue? By the way, I am using latest trunk code. > > > Best Regards, > Raymond Liu > > --089e0149d0fa82383504f83da4e7 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Whoops, you are right. Sorry for the misinformation. Indee= d reduceByKey just calls combineByKey:

def reduceBy= Key(partitioner: Partitioner, func: (V, V) =3D> V): RDD[(K, V)] =3D {
=C2=A0 combineByKey[V]((v: V) =3D> v, func, func, partitioner)
}

(I think I confused reduceByKey with group= ByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond <raymond.liu@intel.com> wrote:
Hi Daniel

=C2=A0 =C2=A0 =C2=A0 =C2=A0 Thanks for your reply, While I think for reduce= ByKey, it will also do map side combine, thus extra the result is the same,= say, for each partition, one entry per distinct word. In my case with java= serializer, =C2=A0240MB dataset yield to around 70MB shuffle data. Only tha= t shuffle Spill ( memory ) is abnormal, and sounds to me should not trigger= at all. And, by the way, this behavior only occurs in map out side, on red= uce / shuffle fetch side, this strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:
daniel.darabos@lynxanalytics.com]

I have no idea why shuffle spill is so large. But this might make it smalle= r:

val addition =3D (a: Int, b: Int) =3D> a + b
val wordsCount =3D wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for ea= ch partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond <raymond.liu@intel.com> wrote:
Hi =C2=A0Patrick

=C2=A0 =C2=A0 =C2=A0 =C2=A0 I am just doing simple word count , the data is= generated by hadoop random text writer.

=C2=A0 =C2=A0 =C2=A0 =C2=A0 This seems to me not quite related to compress = , If I turn off compress on shuffle, the metrics is something like below fo= r the smaller 240MB Dataset.


Executor ID =C2=A0 =C2=A0 Address Task Time =C2=A0 =C2=A0 =C2=A0 Total Task= s =C2=A0 =C2=A0 Failed Tasks =C2=A0 =C2=A0Succeeded Tasks Shuffle Read =C2= =A0 =C2=A0Shuffle Write =C2=A0 Shuffle Spill (Memory) =C2=A0Shuffle Spill (= Disk)
10 =C2=A0 =C2=A0 =C2=A0sr437:48527 =C2=A0 =C2=A0 35 s =C2=A0 =C2=A08 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 8 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 2= .5 MB =C2=A02.2 GB =C2=A01291.2 KB
12 =C2=A0 =C2=A0 =C2=A0sr437:46077 =C2=A0 =C2=A0 34 s =C2=A0 =C2=A08 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 8 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 2= .5 MB =C2=A01822.6 MB =C2=A0 =C2=A0 =C2=A0 1073.3 KB
13 =C2=A0 =C2=A0 =C2=A0sr434:37896 =C2=A0 =C2=A0 31 s =C2=A0 =C2=A08 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 8 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 2= .4 MB =C2=A01099.2 MB =C2=A0 =C2=A0 =C2=A0 621.2 KB
15 =C2=A0 =C2=A0 =C2=A0sr438:52819 =C2=A0 =C2=A0 31 s =C2=A0 =C2=A08 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 8 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 2= .5 MB =C2=A01898.8 MB =C2=A0 =C2=A0 =C2=A0 1072.6 KB
16 =C2=A0 =C2=A0 =C2=A0sr434:37103 =C2=A0 =C2=A0 32 s =C2=A0 =C2=A08 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 8 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 2= .4 MB =C2=A01638.0 MB =C2=A0 =C2=A0 =C2=A0 1044.6 KB


=C2=A0 =C2=A0 =C2=A0 =C2=A0 And the program pretty simple:

val files =3D sc.textFile(args(1))
val words =3D files.flatMap(_.split(" "))
val wordsPair =3D words.map(x =3D> (x, 1))

val wordsCount =3D wordsPair.reduceByKey(_ + _)
val count =3D wordsCount.count()

println("Number of words =3D " + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwendel= l@gmail.com]

Could you explain more what your job is doing and what data types you are u= sing? These numbers alone don't necessarily indicate something is wrong= . The relationship between the in-memory and on-disk shuffle amount is defi= nitely a bit strange, the data gets compressed when written to disk, but un= less you have a weird dataset (E.g. all zeros) I wouldn't expect it to = compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond <raymond.liu@intel.com> wrote:
Hi


=C2=A0 =C2=A0 =C2=A0 =C2=A0 I am running a simple word count program on spa= rk standalone cluster. The cluster is made up of 6 node, each run 4 worker = and each worker own 10G memory and 16 core thus total 96 core and 240G memo= ry. ( well, also used to configed as 1 worker with 40G memory on each node = )

=C2=A0 =C2=A0 =C2=A0 =C2=A0 I run a very small data set (2.4GB on HDFS on t= otal) to confirm the problem here as below:

=C2=A0 =C2=A0 =C2=A0 =C2=A0 As you can read from part of the task metrics a= s below, I noticed that the shuffle spill part of metrics indicate that the= re are something wrong.

Executor ID =C2=A0 =C2=A0 Address Task Time =C2=A0 =C2=A0 =C2=A0 Total Task= s =C2=A0 =C2=A0 Failed Tasks =C2=A0 =C2=A0Succeeded Tasks Shuffle Read =C2= =A0 =C2=A0Shuffle Write =C2=A0 Shuffle Spill (Memory) =C2=A0Shuffle Spill (= Disk)
0 =C2=A0 =C2=A0 =C2=A0 sr437:42139 =C2=A0 =C2=A0 29 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 4= .3 MB =C2=A023.6 GB 4.3 MB
1 =C2=A0 =C2=A0 =C2=A0 sr433:46935 =C2=A0 =C2=A0 1.1 min 4 =C2=A0 =C2=A0 = =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 4.2 MB = =C2=A019.0 GB 3.4 MB
10 =C2=A0 =C2=A0 =C2=A0sr436:53277 =C2=A0 =C2=A0 26 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 4= .3 MB =C2=A025.6 GB 4.6 MB
11 =C2=A0 =C2=A0 =C2=A0sr437:58872 =C2=A0 =C2=A0 32 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 4= .3 MB =C2=A025.0 GB 4.4 MB
12 =C2=A0 =C2=A0 =C2=A0sr435:48358 =C2=A0 =C2=A0 27 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 4= .3 MB =C2=A025.1 GB 4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of= the actual shuffle data and Shuffle Spill (Disk), and also it seems to me = that by no means that the spill should trigger, since the memory is not use= d up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID =C2=A0 =C2=A0 Address Task Time =C2=A0 =C2=A0 =C2=A0 Total Task= s =C2=A0 =C2=A0 Failed Tasks =C2=A0 =C2=A0Succeeded Tasks Shuffle Read =C2= =A0 =C2=A0Shuffle Write =C2=A0 Shuffle Spill (Memory) =C2=A0Shuffle Spill (= Disk)
0 =C2=A0 =C2=A0 =C2=A0 sr437:50895 =C2=A0 =C2=A0 15 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 7= 03.0 KB =C2=A0 =C2=A0 =C2=A0 =C2=A080.0 MB 43.2 KB
1 =C2=A0 =C2=A0 =C2=A0 sr433:50207 =C2=A0 =C2=A0 17 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 7= 04.7 KB =C2=A0 =C2=A0 =C2=A0 =C2=A0389.5 MB =C2=A0 =C2=A0 =C2=A0 =C2=A090.2= KB
10 =C2=A0 =C2=A0 =C2=A0sr436:56352 =C2=A0 =C2=A0 16 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 7= 00.9 KB =C2=A0 =C2=A0 =C2=A0 =C2=A0814.9 MB =C2=A0 =C2=A0 =C2=A0 =C2=A0181.= 6 KB
11 =C2=A0 =C2=A0 =C2=A0sr437:53099 =C2=A0 =C2=A0 15 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 6= 89.7 KB =C2=A0 =C2=A0 =C2=A0 =C2=A00.0 B =C2=A0 0.0 B
12 =C2=A0 =C2=A0 =C2=A0sr435:48318 =C2=A0 =C2=A0 15 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 7= 02.1 KB =C2=A0 =C2=A0 =C2=A0 =C2=A0427.4 MB =C2=A0 =C2=A0 =C2=A0 =C2=A090.7= KB
13 =C2=A0 =C2=A0 =C2=A0sr433:59294 =C2=A0 =C2=A0 17 s =C2=A0 =C2=A04 =C2=A0= =C2=A0 =C2=A0 0 =C2=A0 =C2=A0 =C2=A0 4 =C2=A0 =C2=A0 =C2=A0 0.0 B =C2=A0 7= 04.8 KB =C2=A0 =C2=A0 =C2=A0 =C2=A0779.9 MB =C2=A0 =C2=A0 =C2=A0 =C2=A0180.= 3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill tr= igger codes.

So anyone encounter this issue? =C2=A0By the way, I am using latest trunk c= ode.


Best Regards,
Raymond Liu


--089e0149d0fa82383504f83da4e7--