From commits-return-11590-archive-asf-public=cust-asf.ponee.io@hudi.apache.org Fri Feb 14 13:27:28 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 79E64180647 for ; Fri, 14 Feb 2020 14:27:28 +0100 (CET) Received: (qmail 22802 invoked by uid 500); 14 Feb 2020 13:27:27 -0000 Mailing-List: contact commits-help@hudi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hudi.apache.org Delivered-To: mailing list commits@hudi.apache.org Received: (qmail 22793 invoked by uid 99); 14 Feb 2020 13:27:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Feb 2020 13:27:27 +0000 From: GitBox To: commits@hudi.apache.org Subject: [GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi Message-ID: <158168684778.18864.3817224321684903505.gitbox@gitbox.apache.org> References: In-Reply-To: Date: Fri, 14 Feb 2020 13:27:27 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798 @vinothchandar I've managed to reproduce with a simple spark.parallelize() example. test.scala ``` import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{month, year, col, dayofmonth} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame object HudiScalaStreamHelloWorld { def main(args: Array[String]): Unit = { val appName = "ScalaStreamExample" val batchInterval = Milliseconds(2000) val spark = SparkSession .builder() .appName(appName) .getOrCreate() val sparkContext = spark.sparkContext val streamingContext = new StreamingContext(sparkContext, batchInterval) import spark.implicits._ val sc = sparkContext case class Bar(id: Int, name: String) // Uncomment following section based on example // START - Simple object included in array item // case class Foo(id: Int, bar: Bar) // foo with simple object // END - Simple object included in array item // START - Simple object not present in array item // missing the id: Int property case class Foo(bar: Bar) // foo without simple object // END - Simple object not present in array item case class Root(id: Int, foos: Array[Foo]) // Uncomment following section based on example // START - Simple object included in array item // with simple item on foo in array // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")))))).toDF() // END - Simple object included in array item // START - Simple object not present in array item // without simple item on foo in array val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")))))).toDF() // END - Simple object not present in array item dataFrame.printSchema() val hudiTableName = "order" val hudiTablePath = "s3://xxx-xxxx/path/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } } ``` deploy.sh ``` sbt clean sbt package aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://xxxx.xxxx/simple-project_2.11-1.0.jar aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,HudiScalaStreamHelloWorld,\ s3://xxxx.xxx/simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` build.sbt ``` name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4" libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating" scalacOptions := Seq("-unchecked", "-deprecation") ``` AWS glue job runs over the output s3 directory. From the presto EMR instance the result when simple object included on the array item: ``` presto:schema> select * from default; _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | foos ---------------------+----------------------+--------------------+------------------------+---------------------------------------------------------------------+----+----------------------------------- 20200214112552 | 20200214112552_0_1 | 1 | default | 90190f46-d064-4c8b-ab6a-89ecc9b3ced4-0_0-5-8_20200214112552.parquet | 1 | [{id=1, bar={id=1, name=OneBar}}] (1 row) Query 20200214_130009_00002_hej8h, FINISHED, 1 node http://xx-xx-xxx-xx-xxx.x-xxxxx.compute.amazonaws.com:8889/ui/query.html?20200214_130009_00002_hej8h Splits: 17 total, 17 done (100.00%) CPU Time: 0.0s total, 23 rows/s, 25.6KB/s, 16% active Per Node: 0.1 parallelism, 1 rows/s, 2.09KB/s Parallelism: 0.1 Peak Memory: 0B 0:01 [1 rows, 1.1KB] [1 rows/s, 2.09KB/s] ``` error message when running query on the example code without a simple property on the array item ``` presto:schema> select * from default; Query 20200214_131047_00005_hej8h, FAILED, 1 node http://xxx-xx-xxx-xx-xxx.xx-xxxx.compute.amazonaws.com:8889/ui/query.html?20200214_131047_00005_hej8h Splits: 17 total, 0 done (0.00%) CPU Time: 0.0s total, 0 rows/s, 0B/s, 0% active Per Node: 0.0 parallelism, 0 rows/s, 0B/s Parallelism: 0.0 Peak Memory: 0B 0:01 [0 rows, 0B] [0 rows/s, 0B/s] Query 20200214_131047_00005_hej8h failed: No value present java.util.NoSuchElementException: No value present at java.util.Optional.get(Optional.java:135) at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282) at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225) at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283) at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274) at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261) at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254) at com.facebook.presto.operator.Driver.processInternal(Driver.java:379) at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283) at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675) at com.facebook.presto.operator.Driver.processFor(Driver.java:276) at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077) at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162) at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483) at com.facebook.presto.$gen.Presto_0_227____20200211_134743_1.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` The difference between the 2 records can be highlighted in the following JSON Success: ``` { "id": 1, "foos": [ { "id": 1, // <-- simple value on the array item "bar": { "id": 1, "name": "OneBar" } } ] } ``` fails ``` { "id": 1, "foos": [ { // missing simple property --> "id": 1, "bar": { "id": 1, "name": "OneBar" } } ] } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services