spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 周千昊 <qhz...@apache.org>
Subject Re: repartitionAndSortWithinPartitions task shuffle phase is very slow
Date Fri, 23 Oct 2015 02:24:23 GMT
+kylin dev list

周千昊 <qhzhou@apache.org>于2015年10月23日周五 上午10:20写道:

> Hi, Reynold
>       Using glom() is because it is easy to adapt to calculation logic
> already implemented in MR. And o be clear, we are still in POC.
>       Since the results shows there is almost no difference between this
> glom stage and the MR mapper, using glom here might not be the issue.
>       I was trying to monitor the network traffic when repartition
> happens, and it showed that the traffic peek is about 200 - 300MB/s while
> it stayed at speed of about 3-4MB/s for a long time. Have you guys got any
> idea about it?
>
> Reynold Xin <rxin@databricks.com>于2015年10月23日周五 上午2:43写道:
>
>> Why do you do a glom? It seems unnecessarily expensive to materialize
>> each partition in memory.
>>
>>
>> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhzhou@apache.org> wrote:
>>
>>> Hi, spark community
>>>       I have an application which I try to migrate from MR to Spark.
>>>       It will do some calculations from Hive and output to hfile which
>>> will be bulk load to HBase Table, details as follow:
>>>
>>>      Rdd<Element> input = getSourceInputFromHive()
>>>      Rdd<Tuple2<byte[], byte[]>> mapSideResult =
>>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper
>>> */)
>>>      // PS: the result in each partition has already been sorted
>>> according to the lexicographical order during the calculation
>>>      mapSideResult.repartitionAndSortWithPartitions(/*partition with
>>> byte[][] which is HTable split key, equivalent to MR shuffle  */).map(/*transform
>>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable, KeyValue>/*equivalent
>>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
>>> hfile*/)
>>>
>>>       This all works fine on a small dataset, and spark outruns MR by
>>> about 10%. However when I apply it on a dataset of 150 million records, MR
>>> is about 100% faster than spark.(*MR 25min spark 50min*)
>>>        After exploring into the application UI, it shows that in the
>>> repartitionAndSortWithinPartitions stage is very slow, and in the shuffle
>>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
>>>        *Can anyone help with this issue and give me some advice on
>>> this? **It’s not iterative processing, however I believe Spark could be
>>> the same fast at minimal.*
>>>
>>>       Here are the cluster info:
>>>           vm: 8 nodes * (128G mem + 64 core)
>>>           hadoop cluster: hdp 2.2.6
>>>           spark running mode: yarn-client
>>>           spark version: 1.5.1
>>>
>>>
>>

Mime
View raw message