hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Vinoth Chandar (Jira)" <j...@apache.org>
Subject [jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records
Date Fri, 21 Feb 2020 09:07:00 GMT

    [ https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041691#comment-17041691
] 

Vinoth Chandar commented on HUDI-625:
-------------------------------------

Wondering if its sufficient to just have the payload/data part of the incoming HoodieRecord
in the ExternalSpillableMap.. the other fields like partitionpath, record location etc are
just the same for all records coming in... so we can just add it lazily on demand? This will
cut down the size by a lot and make deserialization simple.. 

> Address performance concerns on DiskBasedMap.get() during upsert of thin records
> --------------------------------------------------------------------------------
>
>                 Key: HUDI-625
>                 URL: https://issues.apache.org/jira/browse/HUDI-625
>             Project: Apache Hudi (incubating)
>          Issue Type: Improvement
>          Components: Performance, Writer Core
>            Reporter: Vinoth Chandar
>            Assignee: Vinoth Chandar
>            Priority: Major
>             Fix For: 0.6.0
>
>         Attachments: image-2020-02-20-23-34-24-155.png, image-2020-02-20-23-34-27-466.png,
image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>  
>  So what's going on here is that each entry (single data field) is estimated to be around
500-750 bytes in memory and things spill a lot... 
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760 for 3675605,HoodieRecord{key=HoodieKey
{ recordKey=3675605 partitionPath=default}, currentLocation='HoodieRecordLocation {instantTime=20200220225748,
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', newLocation='HoodieRecordLocation {instantTime=20200220225921,
fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>  
> h2. Reproduce steps
>  
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
>     --executor-memory 6G \
>     --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
\
>     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>  
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" ->  "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 4000000).map(i => "{\"id\":"
+ i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
>   option(BULK_INSERT_PARALLELISM, 1).
>   mode("Overwrite").
>   save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
records in Hudi table")
> // Runs very slow
> df1.limit(3000000).write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
>   option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
>   option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
>   option(TABLE_NAME, config("table_name")).
>   option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
>   option(UPSERT_PARALLELISM, 20).
>   mode("Append").
>   save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
records in Hudi table")
> {code}
>  
>  
>  
> h2. *Analysis*
> h3. *Upsert (4000000 entries)*
> {code:java}
> WARN HoodieMergeHandle: 
> Number of entries in MemoryBasedMap => 150875 
> Total size in bytes of MemoryBasedMap => 83886580 
> Number of entries in DiskBasedMap => 3849125 
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>  
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
>     at java.util.zip.ZipFile.getEntry(Native Method)
>     at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
>     -  locked java.util.jar.JarFile@1fc27ed4
>     at java.util.jar.JarFile.getEntry(JarFile.java:240)
>     at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
>     at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
>     at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     -  locked java.lang.Object@28f65251
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
>     -  locked scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
>     -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
>     at com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorAccess.java:59)
>     -  locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
>     at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase.lambda$newInstantiator$0(SerializationUtils.java:151)
>     at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase$$Lambda$265/1458915834.newInstance(Unknown
Source)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:562)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:538)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
>     at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:112)
>     at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:86)
>     at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
>     at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
>     at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
>     at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:173)
>     at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
>     at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:280)
>     at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:434)
>     at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:424)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$76/1412692041.call(Unknown
Source)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
>  
> h3. Average time of {{DiskBasedMap#get}}
>  
> {code:java}
> $ monitor *DiskBasedMap get -c 12
> Affect(class-cnt:1 , method-cnt:4) cost in 221 ms.
>  timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
> ----------------------------------------------------------------------------------------
>  2020-02-20 18:13:36  DiskBasedMap  get     5814   5814     0     6.12        0.00%
>  timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
> ----------------------------------------------------------------------------------------
> 2020-02-20 18:13:48  DiskBasedMap   get     9117   9117     0     3.89        0.00%
>  timestamp            class         method  total  success  fail  avg-rt(ms)  fail-rate
> ----------------------------------------------------------------------------------------
>  2020-02-20 18:14:16  DiskBasedMap  get     8490   8490     0     4.10        0.00%
> {code}
>  
> h3. Call time strace:
> {code:java}
> thread-2;id=194;is_daemon=false;priority=5;TCCL=org.apache.spark.repl.ExecutorClassLoader@7a47bc29
>     `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>         +---[0.001704ms] java.util.Map:get()
>         `---[4.344261ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>             `---[4.328981ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                 +---[0.00122ms] org.apache.hudi.common.util.collection.DiskBasedMap:getRandomAccessFile()
>                 `---[4.313586ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                     `---[4.283509ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
>                         +---[0.001169ms] org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getOffsetOfValue()
>                         +---[7.1E-4ms] java.lang.Long:longValue()
>                         +---[6.97E-4ms] org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getSizeOfValue()
>                         +---[0.036483ms] org.apache.hudi.common.util.SpillableMapUtils:readBytesFromDisk()
>                         `---[4.201996ms] org.apache.hudi.common.util.SerializationUtils:deserialize(){code}
> h3. Kryo deserialize performance test
>  
> {code:java}
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.Random;
> /**
>  * Test serialization.
>  */
> public class TestSerializationUtils {
>     public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\":
\"triprec\"," + "\"fields\": [ "
>             + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\",
\"type\": \"string\"},"
>             + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\",
\"type\": \"string\"},"
>             + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\",
\"type\": \"double\"},"
>             + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\",
\"type\": \"double\"},"
>             + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\":
["
>             + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\":
\"string\"}]}},"
>             + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\":
false} ]}";
>     public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
>     public static GenericRecord generateGenericRecord() {
>         Random RAND = new Random(46474747);
>         GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
>         rec.put("_row_key", "rowKey");
>         rec.put("timestamp", "timestamp");
>         rec.put("rider", "riderName");
>         rec.put("driver", "driverName");
>         rec.put("begin_lat", RAND.nextDouble());
>         rec.put("begin_lon", RAND.nextDouble());
>         rec.put("end_lat", RAND.nextDouble());
>         rec.put("end_lon", RAND.nextDouble());
>         rec.put("_hoodie_is_deleted", false);
>         return rec;
>     }
>     public static void main(String[] args) throws Exception {
>         GenericRecord genericRecord = generateGenericRecord();
>         byte[] serializedObject = SerializationUtils.serialize(genericRecord);
>         List<Object> datas = new LinkedList<>();
>         long t1 = System.currentTimeMillis();
>         for (int i = 0; i < 1000; i++) {
>             datas.add(SerializationUtils.<GenericRecord>deserialize(serializedObject));
>         }
>         long t2 = System.currentTimeMillis();
>         System.out.println("dese times: " + datas.size());
>         System.out.println("dese cost: " + (t2 - t1) + "ms");
>     }
> }{code}
>  
> !image-2020-02-21-15-35-56-637.png|width=404,height=165!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message