hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lamber-ken (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (HUDI-494) [DEBUGGING] Huge amount of tasks when writing files into HDFS
Date Sat, 04 Jan 2020 23:34:00 GMT

    [ https://issues.apache.org/jira/browse/HUDI-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17008189#comment-17008189
] 

lamber-ken edited comment on HUDI-494 at 1/4/20 11:33 PM:
----------------------------------------------------------

hi [~garyli1019], I guess your dataset contains many partitions, you can check this option *PARTITIONPATH_FIELD_OPT_KEY*.

hi, [~vinoth] WDYT?

 

*Reprodut steps*

1, steup hudi
{code:java}
${SPARK_HOME}/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name": "kenken", "ts": 1574297893836,
"age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode(Overwrite).
    save(basePath)

var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz", "age": 123,
"location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", "hudi_mor_table").
    mode(Append).
    save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui

!image-2020-01-05-07-30-53-567.png!  

 


was (Author: lamber-ken):
hi [~garyli1019], I guess your dataset contains many partitions, you can check this option *PARTITIONPATH_FIELD_OPT_KEY*.

hi, [~vinoth] WDYT?

 

*Reprodut steps*

1, steup hudi

 
{code:java}
${SPARK_HOME}/bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'{code}
 

2, insert data
{code:java}
import org.apache.spark.sql.SaveMode._val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"var datas = List("""{ "name": "kenken", "ts": 1574297893836,
"age": "12a", "location": "latitude"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode(Overwrite).
    save(basePath)

var datas = List.tabulate(30000)(i => s"""{ "name": "kenken", "ts": "zasz", "age": 123,
"location": "latitude${i}"}""")
val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", "hudi_mor_table").
    mode(Append).
    save("file:///tmp/hudi_mor_table")
{code}
3, generate many tasks && temp files
{code:java}
/tmp/hudi_mor_table/.hoodie/.temp/20200105071257/*
{code}
4, spark ui

!image-2020-01-05-07-30-53-567.png!  

 

> [DEBUGGING] Huge amount of tasks when writing files into HDFS
> -------------------------------------------------------------
>
>                 Key: HUDI-494
>                 URL: https://issues.apache.org/jira/browse/HUDI-494
>             Project: Apache Hudi (incubating)
>          Issue Type: Test
>            Reporter: Yanjia Gary Li
>            Assignee: Vinoth Chandar
>            Priority: Major
>         Attachments: Screen Shot 2020-01-02 at 8.53.24 PM.png, Screen Shot 2020-01-02
at 8.53.44 PM.png, image-2020-01-05-07-30-53-567.png
>
>
> I am using the manual build master after [https://github.com/apache/incubator-hudi/commit/36b3b6f5dd913d3f1c9aa116aff8daf6540fed65] commit.
EDIT: tried with the latest master but got the same result
> I am seeing 3 million tasks when the Hudi Spark job writing the files into HDFS. It seems
like related to the input size. With 7.7 GB input it was 3.2 million tasks, with 9 GB input
it was 3.7 million. Both with 10 parallelisms. 
> I am seeing a huge amount of 0 byte files being written into .hoodie/.temp/ folder in
my HDFS. In the Spark UI, each task only writes less than 10 records in
> {code:java}
> count at HoodieSparkSqlWriter{code}
>  All the stages before this seem normal. Any idea what happened here? My first guess
would be something related to the bloom filter index. Maybe somewhere trigger the repartitioning
with the bloom filter index? But I am not really familiar with that part of the code. 
> Thanks
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message