hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
Date Fri, 14 Feb 2020 14:33:33 GMT
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file
created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   
   case class Bar(id: Int, name: String)
   
   // Uncomment following section based on example
   
   // START - Simple object included in array item
   case class Foo(id: Int, bar: Bar) // foo with simple object
   // END - Simple object included in array item
   
   // START - Simple object not present in array item
   // missing the id: Int property
   // case class Foo(bar: Bar) // foo without simple object
   // END - Simple object not present in array item
   
   
   case class Root(id: Int, foos: Array[Foo])
   
   object HudiScalaStreamHelloWorld {
       
       def main(args: Array[String]): Unit = {  
           val appName = "ScalaStreamExample"
           val batchInterval = Milliseconds(2000)
           val spark = SparkSession
               .builder()
               .appName(appName)
               .getOrCreate()
   
           val sparkContext = spark.sparkContext
           val streamingContext = new StreamingContext(sparkContext, batchInterval)
   
           import spark.implicits._
           val sc = sparkContext
   
           case class Bar(id: Int, name: String)
   
           // Uncomment following section based on example
   
           // START - Simple object included in array item
           // with simple item on foo in array
           // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")))))).toDF()
           // END - Simple object included in array item
   
           // START - Simple object not present in array item
           // without simple item on foo in array
           val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")))))).toDF()
           // END - Simple object not present in array item
   
           dataFrame.printSchema()
   
           val hudiTableName = "order"
           val hudiTablePath = "s3://xxx-xxxx/path/" + hudiTableName
   
           // Set up our Hudi Data Source Options
           val hudiOptions = Map[String,String](
               DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
               HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
               DataSourceWriteOptions.OPERATION_OPT_KEY ->
                   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
               DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
           dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
       }
   }
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://xxxx.xxxx/simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://xxxx.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the presto EMR instance the result when simple object included on the array item:
   ```
   presto:schema> select * from default;
    _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path
|                          _hoodie_file_name                          | id |             
 foos                
   ---------------------+----------------------+--------------------+------------------------+---------------------------------------------------------------------+----+-----------------------------------
    20200214112552      | 20200214112552_0_1   | 1                  | default            
   | 90190f46-d064-4c8b-ab6a-89ecc9b3ced4-0_0-5-8_20200214112552.parquet |  1 | [{id=1, bar={id=1,
name=OneBar}}] 
   (1 row)
   
   Query 20200214_130009_00002_hej8h, FINISHED, 1 node
   http://xx-xx-xxx-xx-xxx.x-xxxxx.compute.amazonaws.com:8889/ui/query.html?20200214_130009_00002_hej8h
   Splits: 17 total, 17 done (100.00%)
   CPU Time: 0.0s total,    23 rows/s, 25.6KB/s, 16% active
   Per Node: 0.1 parallelism,     1 rows/s, 2.09KB/s
   Parallelism: 0.1
   Peak Memory: 0B
   0:01 [1 rows, 1.1KB] [1 rows/s, 2.09KB/s]
   ```
   
   error message when running query on the example code without a simple property on the array
item
   ```
   presto:schema> select * from default;
   
   Query 20200214_131047_00005_hej8h, FAILED, 1 node
   http://xxx-xx-xxx-xx-xxx.xx-xxxx.compute.amazonaws.com:8889/ui/query.html?20200214_131047_00005_hej8h
   Splits: 17 total, 0 done (0.00%)
   CPU Time: 0.0s total,     0 rows/s,     0B/s, 0% active
   Per Node: 0.0 parallelism,     0 rows/s,     0B/s
   Parallelism: 0.0
   Peak Memory: 0B
   0:01 [0 rows, 0B] [0 rows/s, 0B/s]
   
   Query 20200214_131047_00005_hej8h failed: No value present
   java.util.NoSuchElementException: No value present
   	at java.util.Optional.get(Optional.java:135)
   	at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156)
   	at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282)
   	at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268)
   	at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247)
   	at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225)
   	at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283)
   	at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274)
   	at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261)
   	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254)
   	at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
   	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283)
   	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675)
   	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
   	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
   	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
   	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483)
   	at com.facebook.presto.$gen.Presto_0_227____20200211_134743_1.run(Unknown Source)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   The difference between the 2 records can be highlighted in the following JSON
   
   Success:
   ```
   {
     "id": 1,
     "foos": [
       {
         "id": 1, // <-- simple value on the array item
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   fails
   ```
   {
     "id": 1,
     "foos": [
       {
         // missing simple property --> "id": 1,
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message