From commits-return-11904-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Sat Feb 22 03:51:04 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id EA2FF180657 for ; Sat, 22 Feb 2020 04:51:03 +0100 (CET) Received: (qmail 8826 invoked by uid 500); 22 Feb 2020 03:51:03 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 8808 invoked by uid 99); 22 Feb 2020 03:51:03 -0000 Received: from mailrelay1-us-west.apache.org (HELO mailrelay1-us-west.apache.org) (209.188.14.139) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Feb 2020 03:51:03 +0000 Received: from jira-he-de.apache.org (static.172.67.40.188.clients.your-server.de [188.40.67.172]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id EC2C0E30BD for ; Sat, 22 Feb 2020 03:51:01 +0000 (UTC) Received: from jira-he-de.apache.org (localhost.localdomain [127.0.0.1]) by jira-he-de.apache.org (ASF Mail Server at jira-he-de.apache.org) with ESMTP id 688F97806E7 for ; Sat, 22 Feb 2020 03:51:00 +0000 (UTC) Date: Sat, 22 Feb 2020 03:51:00 +0000 (UTC) From: "lamber-ken (Jira)" To: commits@hudi.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (HUDI-625) Address performance concerns on DiskBasedMap.get() during upsert of thin records MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/HUDI-625?page=3Dcom.atlassian.j= ira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D170423= 56#comment-17042356 ]=20 lamber-ken commented on HUDI-625: --------------------------------- Hi, [~vinoth]=C2=A0I cached the Class info, 100x more =C2=A0 Before=C2=A0 dese times: 10000 dese cost: 48859ms =C2=A0 Now dese times: 10000 dese cost: 494ms {code:java} import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.util.SerializationUtils; import org.objenesis.instantiator.ObjectInstantiator; import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; public class KryoTest3 { public static final String TRIP_EXAMPLE_SCHEMA =3D "{\"type\": \"record= \"," + "\"name\": \"triprec\"," + "\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\= ": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": = \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name= \": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\"= : \"end_lon\", \"type\": \"double\"}," + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\"= :\"fare\",\"fields\": [" + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"cur= rency\", \"type\": \"string\"}]}}," + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"= default\": false} ]}"; public static final Schema AVRO_SCHEMA =3D new Schema.Parser().parse(TR= IP_EXAMPLE_SCHEMA); public static GenericRecord generateGenericRecord() { Random RAND =3D new Random(46474747); GenericRecord rec =3D new GenericData.Record(AVRO_SCHEMA); rec.put("_row_key", "rowKey"); rec.put("timestamp", "timestamp"); rec.put("rider", "riderName"); rec.put("driver", "driverName"); rec.put("begin_lat", RAND.nextDouble()); rec.put("begin_lon", RAND.nextDouble()); rec.put("end_lat", RAND.nextDouble()); rec.put("end_lon", RAND.nextDouble()); rec.put("_hoodie_is_deleted", false); return rec; } public static void main(String[] args) throws Exception { GenericRecord genericRecord =3D generateGenericRecord(); Kryo kryo =3D new KryoInstantiator().newKryo(); // store data bytes List objectDatas =3D new LinkedList<>(); for (int i =3D 0; i < 10000; i++) { Output output =3D new Output(1, 4024); kryo.writeClassAndObject(output, genericRecord); output.close(); objectDatas.add(SerializationUtils.serialize(genericRecord)); } long t1 =3D System.currentTimeMillis(); System.out.println("starting deserialize"); // deserialize List datas =3D new LinkedList<>(); for (byte[] data : objectDatas) { datas.add(kryo.readClassAndObject(new Input(data))); } long t2 =3D System.currentTimeMillis(); System.err.println("dese times: " + datas.size()); System.err.println("dese cost: " + (t2 - t1) + "ms"); } private static class KryoInstantiator implements Serializable { public Kryo newKryo() { Kryo kryo =3D new KryoBase(); // ensure that kryo doesn't fail if classes are not registered = with kryo. kryo.setRegistrationRequired(false); // This would be used for object initialization if nothing else= works out. kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInst= antiatorStrategy()); // Handle cases where we may have an odd classloader setup like= with libjars // for hadoop kryo.setClassLoader(Thread.currentThread().getContextClassLoade= r()); return kryo; } private static class KryoBase extends Kryo { @Override protected Serializer newDefaultSerializer(Class type) { final Serializer serializer =3D super.newDefaultSerializer(= type); if (serializer instanceof FieldSerializer) { final FieldSerializer fieldSerializer =3D (FieldSeriali= zer) serializer; fieldSerializer.setIgnoreSyntheticFields(true); } return serializer; } Map cache =3D new HashMap<>(); @Override protected ObjectInstantiator newInstantiator(Class type) { return () -> { if (cache.get(type) !=3D null) { return cache.get(type); } // fall back to java based instantiation. try { final Constructor constructor =3D type.getConstruct= or(); constructor.setAccessible(true); Object o =3D constructor.newInstance(); cache.put(o.getClass(), o); return o; } catch (NoSuchMethodException | IllegalAccessException= | InstantiationException | InvocationTargetException e) { // ignore this exception. we will fall back to defa= ult instantiation strategy. } Object o =3D super.getInstantiatorStrategy().newInstant= iatorOf(type).newInstance(); cache.put(o.getClass(), o); return o; }; } } } } {code} =C2=A0 =C2=A0 > Address performance concerns on DiskBasedMap.get() during upsert of thin = records > -------------------------------------------------------------------------= ------- > > Key: HUDI-625 > URL: https://issues.apache.org/jira/browse/HUDI-625 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Performance, Writer Core > Reporter: Vinoth Chandar > Assignee: Vinoth Chandar > Priority: Major > Fix For: 0.6.0 > > Attachments: image-2020-02-20-23-34-24-155.png, image-2020-02-20-= 23-34-27-466.png, image-2020-02-21-15-35-56-637.png > > > [https://github.com/apache/incubator-hudi/issues/1328] > =C2=A0 > =C2=A0So what's going on here is that each entry (single data field) is e= stimated to be around 500-750 bytes in memory and things spill a lot...=C2= =A0 > {code:java} > 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size =3D> = 760 for 3675605,HoodieRecord{key=3DHoodieKey { recordKey=3D3675605 partitio= nPath=3Ddefault}, currentLocation=3D'HoodieRecordLocation {instantTime=3D20= 200220225748, fileId=3D499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}', newLocatio= n=3D'HoodieRecordLocation {instantTime=3D20200220225921, fileId=3D499f8d2c-= df6a-4275-9166-3de4ac91f3bf-0}'} {code} > =C2=A0 > {code:java} > INFO HoodieMergeHandle: Number of entries in MemoryBasedMap =3D> 150875 > Total size in bytes of MemoryBasedMap =3D> 83886580 > Number of entries in DiskBasedMap =3D> 2849125 > Size of file spilled to disk =3D> 1067101739 {code} > h2. Reproduce steps > =C2=A0 > {code:java} > export SPARK_HOME=3D/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7 > ${SPARK_HOME}/bin/spark-shell \ > --executor-memory 6G \ > --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,or= g.apache.spark:spark-avro_2.11:2.4.4 \ > --conf 'spark.serializer=3Dorg.apache.spark.serializer.KryoSerializer= ' > {code} > =C2=A0 > {code:java} > val HUDI_FORMAT =3D "org.apache.hudi" > val TABLE_NAME =3D "hoodie.table.name" > val RECORDKEY_FIELD_OPT_KEY =3D "hoodie.datasource.write.recordkey.field" > val PRECOMBINE_FIELD_OPT_KEY =3D "hoodie.datasource.write.precombine.fiel= d" > val OPERATION_OPT_KEY =3D "hoodie.datasource.write.operation" > val BULK_INSERT_OPERATION_OPT_VAL =3D "bulk_insert" > val UPSERT_OPERATION_OPT_VAL =3D "upsert" > val BULK_INSERT_PARALLELISM =3D "hoodie.bulkinsert.shuffle.parallelism" > val UPSERT_PARALLELISM =3D "hoodie.upsert.shuffle.parallelism" > val config =3D Map( > "table_name" -> "example_table", > "target" -> "file:///tmp/example_table/", > "primary_key" -> "id", > "sort_key" -> "id" > ) > val readPath =3D config("target") + "/*"val json_data =3D (1 to 4000000).= map(i =3D> "{\"id\":" + i + "}") > val jsonRDD =3D spark.sparkContext.parallelize(json_data, 2) > val df1 =3D spark.read.json(jsonRDD) > println(s"${df1.count()} records in source 1") > df1.write.format(HUDI_FORMAT). > option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")). > option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")). > option(TABLE_NAME, config("table_name")). > option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL). > option(BULK_INSERT_PARALLELISM, 1). > mode("Overwrite"). > save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(r= eadPath).count()} records in Hudi table") > // Runs very slow > df1.limit(3000000).write.format(HUDI_FORMAT). > option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")). > option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")). > option(TABLE_NAME, config("table_name")). > option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL). > option(UPSERT_PARALLELISM, 20). > mode("Append"). > save(config("target")) > // Runs very slow > df1.write.format(HUDI_FORMAT). > option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")). > option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")). > option(TABLE_NAME, config("table_name")). > option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL). > option(UPSERT_PARALLELISM, 20). > mode("Append"). > save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(r= eadPath).count()} records in Hudi table") > {code} > =C2=A0 > =C2=A0 > =C2=A0 > h2. *Analysis* > h3. *Upsert (4000000 entries)* > {code:java} > WARN HoodieMergeHandle:=20 > Number of entries in MemoryBasedMap =3D> 150875=20 > Total size in bytes of MemoryBasedMap =3D> 83886580=20 > Number of entries in DiskBasedMap =3D> 3849125=20 > Size of file spilled to disk =3D> 1443046132 > {code} > h3. Hang stackstrace (DiskBasedMap#get) > =C2=A0 > {code:java} > "pool-21-thread-2" Id=3D696 cpuUsage=3D98% RUNNABLE > at java.util.zip.ZipFile.getEntry(Native Method) > at java.util.zip.ZipFile.getEntry(ZipFile.java:310) > - locked java.util.jar.JarFile@1fc27ed4 > at java.util.jar.JarFile.getEntry(JarFile.java:240) > at java.util.jar.JarFile.getJarEntry(JarFile.java:223) > at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005= ) > at sun.misc.URLClassPath.getResource(URLClassPath.java:212) > at java.net.URLClassLoader$1.run(URLClassLoader.java:365) > at java.net.URLClassLoader$1.run(URLClassLoader.java:362) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:361) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > - locked java.lang.Object@28f65251 > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked scala.reflect.internal.util.ScalaClassLoader$URLClassLoader= @a353dff > at java.lang.ClassLoader.loadClass(ClassLoader.java:411) > - locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2 > at com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(Access= ClassLoader.java:92) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorA= ccess.java:59) > - locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2 > at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$Kr= yoBase.lambda$newInstantiator$0(SerializationUtils.java:151) > at org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$Kr= yoBase$$Lambda$265/1458915834.newInstance(Unknown Source) > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139) > at com.esotericsoftware.kryo.serializers.FieldSerializer.create(Field= Serializer.java:562) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSe= rializer.java:538) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) > at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField= .java:125) > at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSe= rializer.java:543) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) > at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInsta= nce.deserialize(SerializationUtils.java:112) > at org.apache.hudi.common.util.SerializationUtils.deserialize(Seriali= zationUtils.java:86) > at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedM= ap.java:217) > at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedM= ap.java:211) > at org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedM= ap.java:207) > at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(Ex= ternalSpillableMap.java:173) > at org.apache.hudi.common.util.collection.ExternalSpillableMap.get(Ex= ternalSpillableMap.java:55) > at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:= 280) > at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consume= OneRecord(HoodieCopyOnWriteTable.java:434) > at org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consume= OneRecord(HoodieCopyOnWriteTable.java:424) > at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.con= sume(BoundedInMemoryQueueConsumer.java:37) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$n= ull$2(BoundedInMemoryExecutor.java:121) > at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$= 76/1412692041.call(Unknown Source) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecut= or.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecu= tor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > =C2=A0 > h3. Average time of=C2=A0{{DiskBasedMap#get}} > =C2=A0 > {code:java} > $ monitor *DiskBasedMap get -c 12 > Affect(class-cnt:1 , method-cnt:4) cost in 221 ms. > timestamp class method total success fail avg-rt(= ms) fail-rate > -------------------------------------------------------------------------= --------------- > 2020-02-20 18:13:36 DiskBasedMap get 5814 5814 0 6.12 = 0.00% > timestamp class method total success fail avg-rt(= ms) fail-rate > -------------------------------------------------------------------------= --------------- > 2020-02-20 18:13:48 DiskBasedMap get 9117 9117 0 3.89 = 0.00% > timestamp class method total success fail avg-rt(= ms) fail-rate > -------------------------------------------------------------------------= --------------- > 2020-02-20 18:14:16 DiskBasedMap get 8490 8490 0 4.10 = 0.00% > {code} > =C2=A0 > h3. Call time strace: > {code:java} > thread-2;id=3D194;is_daemon=3Dfalse;priority=3D5;TCCL=3Dorg.apache.spark.= repl.ExecutorClassLoader@7a47bc29 > `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:= get() > +---[0.001704ms] java.util.Map:get() > `---[4.344261ms] org.apache.hudi.common.util.collection.DiskBased= Map:get() > `---[4.328981ms] org.apache.hudi.common.util.collection.DiskB= asedMap:get() > +---[0.00122ms] org.apache.hudi.common.util.collection.Di= skBasedMap:getRandomAccessFile() > `---[4.313586ms] org.apache.hudi.common.util.collection.D= iskBasedMap:get() > `---[4.283509ms] org.apache.hudi.common.util.collecti= on.DiskBasedMap:get() > +---[0.001169ms] org.apache.hudi.common.util.coll= ection.DiskBasedMap$ValueMetadata:getOffsetOfValue() > +---[7.1E-4ms] java.lang.Long:longValue() > +---[6.97E-4ms] org.apache.hudi.common.util.colle= ction.DiskBasedMap$ValueMetadata:getSizeOfValue() > +---[0.036483ms] org.apache.hudi.common.util.Spil= lableMapUtils:readBytesFromDisk() > `---[4.201996ms] org.apache.hudi.common.util.Seri= alizationUtils:deserialize(){code} > h3. Kryo=C2=A0deserialize=C2=A0performance test > =C2=A0 > {code:java} > import org.apache.avro.Schema; > import org.apache.avro.generic.GenericData; > import org.apache.avro.generic.GenericRecord; > import java.util.LinkedList; > import java.util.List; > import java.util.Random; > /** > * Test serialization. > */ > public class TestSerializationUtils { > public static final String TRIP_EXAMPLE_SCHEMA =3D "{\"type\": \"reco= rd\"," + "\"name\": \"triprec\"," + "\"fields\": [ " > + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"nam= e\": \"_row_key\", \"type\": \"string\"}," > + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\"= : \"driver\", \"type\": \"string\"}," > + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"na= me\": \"begin_lon\", \"type\": \"double\"}," > + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name= \": \"end_lon\", \"type\": \"double\"}," > + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name= \":\"fare\",\"fields\": [" > + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"c= urrency\", \"type\": \"string\"}]}}," > + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", = \"default\": false} ]}"; > public static final Schema AVRO_SCHEMA =3D new Schema.Parser().parse(= TRIP_EXAMPLE_SCHEMA); > public static GenericRecord generateGenericRecord() { > Random RAND =3D new Random(46474747); > GenericRecord rec =3D new GenericData.Record(AVRO_SCHEMA); > rec.put("_row_key", "rowKey"); > rec.put("timestamp", "timestamp"); > rec.put("rider", "riderName"); > rec.put("driver", "driverName"); > rec.put("begin_lat", RAND.nextDouble()); > rec.put("begin_lon", RAND.nextDouble()); > rec.put("end_lat", RAND.nextDouble()); > rec.put("end_lon", RAND.nextDouble()); > rec.put("_hoodie_is_deleted", false); > return rec; > } > public static void main(String[] args) throws Exception { > GenericRecord genericRecord =3D generateGenericRecord(); > byte[] serializedObject =3D SerializationUtils.serialize(genericR= ecord); > List datas =3D new LinkedList<>(); > long t1 =3D System.currentTimeMillis(); > for (int i =3D 0; i < 1000; i++) { > datas.add(SerializationUtils.deserialize(seria= lizedObject)); > } > long t2 =3D System.currentTimeMillis(); > System.out.println("dese times: " + datas.size()); > System.out.println("dese cost: " + (t2 - t1) + "ms"); > } > }{code} > =C2=A0 > !image-2020-02-21-15-35-56-637.png|width=3D404,height=3D165! -- This message was sent by Atlassian Jira (v8.3.4#803005)