spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lix...@apache.org
Subject spark git commit: [SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table which uses Avro schema url 'avro.schema.url'
Date Wed, 22 Nov 2017 06:31:50 GMT
Repository: spark
Updated Branches:
  refs/heads/master 881c5c807 -> e0d7665ce


[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table which uses Avro
schema url 'avro.schema.url'

## What changes were proposed in this pull request?
SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, causing NullPointerException
in AvroSerde when using avro.schema.url

Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/' tblproperties
('avro.schema.url'='/avro-schema/avro.avsc');

create external table avro_out (a string) stored as avro location '/avro-out/' tblproperties
('avro.schema.url'='/avro-schema/avro.avsc');

 insert overwrite table avro_out select * from avro_in;  // fails with java.lang.NullPointerException

 WARN AvroSerDe: Encountered exception determining schema. Returning signal schema to indicate
problem
java.lang.NullPointerException
	at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)

## Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert operation,
instead pass Hadoop configuration object
## How was this patch tested?
Added new test case in VersionsSuite

Author: vinodkc <vinod.kc.in@gmail.com>

Closes #19779 from vinodkc/br_Fix_SPARK-17920.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0d7665c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0d7665c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0d7665c

Branch: refs/heads/master
Commit: e0d7665cec1e6954d640f422c79ebba4c273be7d
Parents: 881c5c8
Author: vinodkc <vinod.kc.in@gmail.com>
Authored: Tue Nov 21 22:31:46 2017 -0800
Committer: gatorsmile <gatorsmile@gmail.com>
Committed: Tue Nov 21 22:31:46 2017 -0800

----------------------------------------------------------------------
 .../sql/hive/execution/HiveFileFormat.scala     |  4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   | 72 +++++++++++++++++++-
 2 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index ac735e8..4a7cd69 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -116,7 +116,7 @@ class HiveOutputWriter(
 
   private val serializer = {
     val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
-    serializer.initialize(null, tableDesc.getProperties)
+    serializer.initialize(jobConf, tableDesc.getProperties)
     serializer
   }
 
@@ -130,7 +130,7 @@ class HiveOutputWriter(
 
   private val standardOI = ObjectInspectorUtils
     .getStandardObjectInspector(
-      tableDesc.getDeserializer.getObjectInspector,
+      tableDesc.getDeserializer(jobConf).getObjectInspector,
       ObjectInspectorCopyOption.JAVA)
     .asInstanceOf[StructObjectInspector]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9ed39cc..fbf6877 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging {
       }
     }
 
+    test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val schemaPath = s"""$path${File.separator}avroschemadir"""
+
+        new File(schemaPath).mkdir()
+        val avroSchema =
+          """{
+            |  "name": "test_record",
+            |  "type": "record",
+            |  "fields": [ {
+            |    "name": "f0",
+            |    "type": [
+            |      "null",
+            |      {
+            |        "precision": 38,
+            |        "scale": 2,
+            |        "type": "bytes",
+            |        "logicalType": "decimal"
+            |      }
+            |    ]
+            |  } ]
+            |}
+          """.stripMargin
+        val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc"""
+        val schemaFile = new File(schemaPath, "avroDecimal.avsc")
+        val writer = new PrintWriter(schemaFile)
+        writer.write(avroSchema)
+        writer.close()
+
+        val url = Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
+        val srcLocation = new File(url.getFile)
+        val destTableName = "tab1"
+        val srcTableName = "tab2"
+
+        withTable(srcTableName, destTableName) {
+          versionSpark.sql(
+            s"""
+               |CREATE EXTERNAL TABLE $srcTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$srcLocation'
+               |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+           """.stripMargin
+          )
+
+          versionSpark.sql(
+            s"""
+               |CREATE TABLE $destTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+               |STORED AS
+               |  INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+           """.stripMargin
+          )
+          versionSpark.sql(
+            s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM $srcTableName""")
+          val result = versionSpark.table(srcTableName).collect()
+          assert(versionSpark.table(destTableName).collect() === result)
+          versionSpark.sql(
+            s"""INSERT INTO TABLE $destTableName SELECT * FROM $srcTableName""")
+          assert(versionSpark.table(destTableName).collect().toSeq === result ++ result)
+        }
+      }
+    }
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message