spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "yzheng616 (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22137) Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)
Date Fri, 29 Sep 2017 09:04:01 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16185535#comment-16185535
] 

yzheng616 commented on SPARK-22137:
-----------------------------------

Have you tried to use DataFrameWriter.insertInto(tableName: String) API to insert data to
the table? 

> Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22137
>                 URL: https://issues.apache.org/jira/browse/SPARK-22137
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: yzheng616
>
> Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String).
The issue seems similar with SPARK-17765 which have been resolved in 2.1.0. 
> Error message: 
> {color:red}Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
resolve '`features`' due to data type mismatch: cannot cast org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true),
StructField(values,ArrayType(DoubleType,true),true));;
> 'InsertIntoTable Relation[id#21,features#22] parquet, OverwriteOptions(false,Map()),
false
> +- 'Project [cast(id#13L as int) AS id#27, cast(features#14 as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
AS features#28]
>    +- LogicalRDD [id#13L, features#14]{color}
> Reproduce code:
> {code:java}
> import scala.annotation.varargs
> import org.apache.spark.ml.linalg.SQLDataTypes
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.types.LongType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> case class UDT(`id`: Long, `features`: org.apache.spark.ml.linalg.Vector)
> object UDTTest {
>   def main(args: Array[String]): Unit = {
>     val tb = "table_udt"
>     val spark = SparkSession.builder().master("local[4]").appName("UDTInsertInto").enableHiveSupport().getOrCreate()
>     spark.sql("drop table if exists " + tb)
>     
>     /*
>      * VectorUDT sql type definition:
>      * 
>      *   override def sqlType: StructType = {
>      *   StructType(Seq(
>      *   	StructField("type", ByteType, nullable = false),
>      *   	StructField("size", IntegerType, nullable = true),
>      *   	StructField("indices", ArrayType(IntegerType, containsNull = false), nullable
= true),
>      *   	StructField("values", ArrayType(DoubleType, containsNull = false), nullable
= true)))
>      *   }
>     */
>     
>     //Create Hive table base on VectorUDT sql type
>     spark.sql("create table if not exists "+tb+"(id int, features struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)"
+
>       " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>       " stored as"+
>         " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
>         " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
>     var seq = new scala.collection.mutable.ArrayBuffer[UDT]()
>     for (x <- 1 to 2) {
>       seq += (new UDT(x, org.apache.spark.ml.linalg.Vectors.dense(0.2, 0.21, 0.44)))
>     }
>     val rowRDD = (spark.sparkContext.makeRDD[UDT](seq)).map { x => Row.fromSeq(Seq(x.id,x.features))
}
>     val schema = StructType(Array(StructField("id", LongType,false),StructField("features",
SQLDataTypes.VectorType,false)))
>     val df = spark.createDataFrame(rowRDD, schema)
>      
>     //insert into hive table
>     df.write.insertInto(tb)
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message