spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hemant Bhanawat <hemant9...@gmail.com>
Subject Re: Optimal way to re-partition from a single partition
Date Wed, 10 Feb 2016 05:00:48 GMT
Ohk. I was comparing groupBy with orderBy and now I realize that they are
using different partitioning schemes.

Thanks Takeshi.



On Tue, Feb 9, 2016 at 9:09 PM, Takeshi Yamamuro <linguin.m.s@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9379@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <cesar7@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Mime
View raw message