spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shyam P <shyamabigd...@gmail.com>
Subject Re: spark df.write.partitionBy run very slow
Date Thu, 14 Mar 2019 06:26:26 GMT
cool.

On Tue, Mar 12, 2019 at 9:08 AM JF Chen <darouwan@gmail.com> wrote:

> Hi
> Finally I found the reason...
> It caused by some long time gc on some datanodes. After receiving the data
> from executors, the data node with long gc cannot report blocks to
> namenode, so the writing progress takes a long time.
> Now I have decommissioned the broken data nodes, and now my spark runs
> well.
> I am trying to increase the heap size of data node to check if it can
> resolve the problem
>
> Regard,
> Junfeng Chen
>
>
> On Fri, Mar 8, 2019 at 8:54 PM Shyam P <shyamabigdata@gmail.com> wrote:
>
>> Did you check this , how many portions and count of records it shoes ?
>>
>> //count by partition_id
>>         import org.apache.spark.sql.functions.spark_partition_id
>>         df.groupBy(spark_partition_id).count.show()
>>
>>
>>
>> Are you getting same number of parquet files ?
>>
>> You gradually increase the sample size.
>>
>> On Fri, 8 Mar 2019, 14:17 JF Chen, <darouwan@gmail.com> wrote:
>>
>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>> month, day, hour), and the number of partitions of appname is much more
>>> than partition of year, month, day, and hour. My spark streaming app runs
>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>> time.
>>> So will the number of appname pattition affect the writing efficiency?
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen <darouwan@gmail.com> wrote:
>>>
>>>> Yes, I agree.
>>>>
>>>> From the spark UI I can ensure data is not skewed. There is only about
>>>> 100MB for each task, where most of tasks takes several seconds to write the
>>>> data to hdfs, and some tasks takes minutes of time.
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P <shyamabigdata@gmail.com> wrote:
>>>>
>>>>> Hi JF,
>>>>> Yes first we should know actual number of partitions dataframe has and
>>>>> its counts of records. Accordingly we should try to have data evenly
in all
>>>>> partitions.
>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>
>>>>>
>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>>> , this is correct.
>>>>> Hence sometimes we should go with bigger number first then lesser ....
>>>>> try this ..i.e. more parent directories and less child directories. Tweet
>>>>> around it and try.
>>>>>
>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>
>>>>> ~Shyam
>>>>>
>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen <darouwan@gmail.com> wrote:
>>>>>
>>>>>> Hi Shyam
>>>>>> Thanks for your reply.
>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>> column_c, the sequence of column in partitionBy should be same to
the order
>>>>>> of partitions number of column a, b and c?
>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>> directory  hierarchy structure. I hope the sequence of columns not
change.
>>>>>>
>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>> cost much more time than others, where the amount of writing data
is
>>>>>> similar. How to solve it?
>>>>>>
>>>>>> Regard,
>>>>>> Junfeng Chen
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P <shyamabigdata@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi JF ,
>>>>>>>  Try to execute it before df.write....
>>>>>>>
>>>>>>> //count by partition_id
>>>>>>>         import org.apache.spark.sql.functions.spark_partition_id
>>>>>>>         df.groupBy(spark_partition_id).count.show()
>>>>>>>
>>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>>
>>>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>>>> column_c)
>>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>>
>>>>>>> Try this.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Shyam
>>>>>>>
>>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen <darouwan@gmail.com>
wrote:
>>>>>>>
>>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>>> However, it costs several minutes to write only hundreds
of MB data
>>>>>>>> to hdfs.
>>>>>>>> From this article
>>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>>> adding repartition method before write should work. But if
there
>>>>>>>> is data skew, some tasks may cost much longer time than average,
which
>>>>>>>> still cost much time.
>>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>>
>>>>>>>>
>>>>>>>> Regard,
>>>>>>>> Junfeng Chen
>>>>>>>>
>>>>>>>

Mime
View raw message