hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-hudi] adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file created by hudi
Date Fri, 14 Feb 2020 14:55:58 GMT
adamjoneill edited a comment on issue #1325: presto - querying nested object in parquet file
created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   case class Bar(id: Int, name: String)
   
   // choose one of the following
   case class Foo(id: Int, bar: Bar) // with simple
   // case class Foo(bar: Bar) // withOUT simple
   
   case class Root(id: Int, foos: Array[Foo])
   
   object HudiScalaStreamHelloWorld {
       
       def main(args: Array[String]): Unit = {  
           val appName = "ScalaStreamExample"
           val batchInterval = Milliseconds(2000)
           val spark = SparkSession
               .builder()
               .appName(appName)
               .getOrCreate()
   
           val sparkContext = spark.sparkContext
           val streamingContext = new StreamingContext(sparkContext, batchInterval)
   
           import spark.implicits._
           val sc = sparkContext
           
   
           // choose one of the following
           val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")))))).toDF()
// with simple
           // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")))))).toDF()
// withOUT simple
   
           dataFrame.printSchema()
   
           val hudiTableName = "order"
           val hudiTablePath = "s3://xxxxxxx/path/" + hudiTableName
   
           // Set up our Hudi Data Source Options
           val hudiOptions = Map[String,String](
               DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
               HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
               DataSourceWriteOptions.OPERATION_OPT_KEY ->
                   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
               DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
               
           dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
       }
   }
       
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://xxxx.xxxx/simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://xxxx.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the presto EMR instance the result when simple object included on the array item:
   ```
   presto:schema> select * from default;
    _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path
|                          _hoodie_file_name                          | id |             
 foos                
   ---------------------+----------------------+--------------------+------------------------+---------------------------------------------------------------------+----+-----------------------------------
    20200214112552      | 20200214112552_0_1   | 1                  | default            
   | 90190f46-d064-4c8b-ab6a-89ecc9b3ced4-0_0-5-8_20200214112552.parquet |  1 | [{id=1, bar={id=1,
name=OneBar}}] 
   (1 row)
   
   Query 20200214_130009_00002_hej8h, FINISHED, 1 node
   http://xx-xx-xxx-xx-xxx.x-xxxxx.compute.amazonaws.com:8889/ui/query.html?20200214_130009_00002_hej8h
   Splits: 17 total, 17 done (100.00%)
   CPU Time: 0.0s total,    23 rows/s, 25.6KB/s, 16% active
   Per Node: 0.1 parallelism,     1 rows/s, 2.09KB/s
   Parallelism: 0.1
   Peak Memory: 0B
   0:01 [1 rows, 1.1KB] [1 rows/s, 2.09KB/s]
   ```
   
   error message when running query on the example code without a simple property on the array
item
   ```
   presto:schema> select * from default;
   
   Query 20200214_131047_00005_hej8h, FAILED, 1 node
   http://xxx-xx-xxx-xx-xxx.xx-xxxx.compute.amazonaws.com:8889/ui/query.html?20200214_131047_00005_hej8h
   Splits: 17 total, 0 done (0.00%)
   CPU Time: 0.0s total,     0 rows/s,     0B/s, 0% active
   Per Node: 0.0 parallelism,     0 rows/s,     0B/s
   Parallelism: 0.0
   Peak Memory: 0B
   0:01 [0 rows, 0B] [0 rows/s, 0B/s]
   
   Query 20200214_131047_00005_hej8h failed: No value present
   java.util.NoSuchElementException: No value present
   	at java.util.Optional.get(Optional.java:135)
   	at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156)
   	at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282)
   	at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268)
   	at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247)
   	at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225)
   	at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283)
   	at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274)
   	at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261)
   	at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254)
   	at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
   	at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283)
   	at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675)
   	at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
   	at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
   	at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
   	at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483)
   	at com.facebook.presto.$gen.Presto_0_227____20200211_134743_1.run(Unknown Source)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   The difference between the 2 records can be highlighted in the following JSON
   
   Success:
   ```
   {
     "id": 1,
     "foos": [
       {
         "id": 1, // <-- simple value on the array item
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   fails
   ```
   {
     "id": 1,
     "foos": [
       {
         // missing simple property --> "id": 1,
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message