hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] vinothchandar commented on issue #1328: Hudi upsert hangs
Date Wed, 19 Feb 2020 17:25:55 GMT
vinothchandar commented on issue #1328: Hudi upsert hangs
URL: https://github.com/apache/incubator-hudi/issues/1328#issuecomment-588341138
 
 
   I ported your code to scala and looking into the issue now.. Will keep you posted. 
   
   ```
   val HUDI_FORMAT = "org.apache.hudi"
   val TABLE_NAME = "hoodie.table.name"
   val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
   val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
   val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
   val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
   val UPSERT_OPERATION_OPT_VAL = "upsert"
   val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
   val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
   val config = Map(
   "table_name" -> "example_table",
   "target" -> "file:///tmp/example_table/",
   "primary_key" ->  "id",
   "sort_key" -> "id"
   )
   val readPath = config("target") + "/*"
   
   val json_data = (1 to 4000000).map(i => "{\"id\":" + i + "}")
   val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
   val df1 = spark.read.json(jsonRDD)
   println(s"${df1.count()} records in source 1")
   
   df1.printSchema
   
   // Runs quick
   df1.write.format(HUDI_FORMAT)
         .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key"))
         .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key"))
         .option(TABLE_NAME, config("table_name"))
         .option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL)
         .option(BULK_INSERT_PARALLELISM, 1)
         .mode("Overwrite")
         .save(config("target"))
   
   
   println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table")
   
   // Runs quick
   df1.limit(3000000).write.format(HUDI_FORMAT)
       .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key"))
       .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key"))
       .option(TABLE_NAME, config("table_name"))
       .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
       .option(UPSERT_PARALLELISM, 20)
       .mode("Append")
       .save(config("target"))
   
   println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table")
   
   // Runs very slow
   df1.write.format(HUDI_FORMAT)
     .option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key"))
     .option(RECORDKEY_FIELD_OPT_KEY, config("primary_key"))
     .option(TABLE_NAME, config("table_name"))
     .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
     .option(UPSERT_PARALLELISM, 20)
     .mode("Append")
     .save(config("target"))
   
   println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()} records in Hudi table")
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message