spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gurwls...@apache.org
Subject spark git commit: [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
Date Wed, 29 Aug 2018 01:25:57 GMT
Repository: spark
Updated Branches:
  refs/heads/master 32c8a3d7b -> 68ec207a3


[SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType

## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the record name to
namespace so its impossible to have an Avro namespace independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("address", StructType(Seq(
        StructField("city", StringType, nullable = false),
        StructField("state", StringType, nullable = false))),
    nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ec207a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ec207a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ec207a

Branch: refs/heads/master
Commit: 68ec207a320bd50ca61e820c9ff559f799c2ab0a
Parents: 32c8a3d
Author: Arun Mahadevan <arunm@apache.org>
Authored: Wed Aug 29 09:25:49 2018 +0800
Committer: hyukjinkwon <gurwls223@apache.org>
Committed: Wed Aug 29 09:25:49 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/avro/SchemaConverters.scala       | 18 ++++-----
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 42 +++++++++++++++++++-
 2 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68ec207a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 3a15e8d..bd15765 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -123,7 +123,7 @@ object SchemaConverters {
       catalystType: DataType,
       nullable: Boolean = false,
       recordName: String = "topLevelRecord",
-      prevNameSpace: String = "")
+      nameSpace: String = "")
     : Schema = {
     val builder = SchemaBuilder.builder()
 
@@ -143,29 +143,25 @@ object SchemaConverters {
         val avroType = LogicalTypes.decimal(d.precision, d.scale)
         val fixedSize = minBytesForPrecision(d.precision)
         // Need to avoid naming conflict for the fixed fields
-        val name = prevNameSpace match {
+        val name = nameSpace match {
           case "" => s"$recordName.fixed"
-          case _ => s"$prevNameSpace.$recordName.fixed"
+          case _ => s"$nameSpace.$recordName.fixed"
         }
         avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize))
 
       case BinaryType => builder.bytesType()
       case ArrayType(et, containsNull) =>
         builder.array()
-          .items(toAvroType(et, containsNull, recordName, prevNameSpace))
+          .items(toAvroType(et, containsNull, recordName, nameSpace))
       case MapType(StringType, vt, valueContainsNull) =>
         builder.map()
-          .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
+          .values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
       case st: StructType =>
-        val nameSpace = prevNameSpace match {
-          case "" => recordName
-          case _ => s"$prevNameSpace.$recordName"
-        }
-
+        val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
         val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
         st.foreach { f =>
           val fieldAvroType =
-            toAvroType(f.dataType, f.nullable, f.name, nameSpace)
+            toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
           fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
         }
         fieldsAssembler.endRecord()

http://git-wip-us.apache.org/repos/asf/spark/blob/68ec207a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 72bef9e..9ad4388 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1082,7 +1082,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils
{
       val schema = getAvroSchemaStringFromFiles(dir.toString)
       assert(schema.contains("\"namespace\":\"topLevelRecord\""))
       assert(schema.contains("\"namespace\":\"topLevelRecord.data\""))
-      assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\""))
     }
   }
 
@@ -1099,6 +1098,47 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils
{
     }
   }
 
+  test("check namespace - toAvroType") {
+    val sparkSchema = StructType(Seq(
+      StructField("name", StringType, nullable = false),
+      StructField("address", StructType(Seq(
+        StructField("city", StringType, nullable = false),
+        StructField("state", StringType, nullable = false))),
+        nullable = false)))
+    val employeeType = SchemaConverters.toAvroType(sparkSchema,
+      recordName = "employee",
+      nameSpace = "foo.bar")
+
+    assert(employeeType.getFullName == "foo.bar.employee")
+    assert(employeeType.getName == "employee")
+    assert(employeeType.getNamespace == "foo.bar")
+
+    val addressType = employeeType.getField("address").schema()
+    assert(addressType.getFullName == "foo.bar.employee.address")
+    assert(addressType.getName == "address")
+    assert(addressType.getNamespace == "foo.bar.employee")
+  }
+
+  test("check empty namespace - toAvroType") {
+    val sparkSchema = StructType(Seq(
+      StructField("name", StringType, nullable = false),
+      StructField("address", StructType(Seq(
+        StructField("city", StringType, nullable = false),
+        StructField("state", StringType, nullable = false))),
+        nullable = false)))
+    val employeeType = SchemaConverters.toAvroType(sparkSchema,
+      recordName = "employee")
+
+    assert(employeeType.getFullName == "employee")
+    assert(employeeType.getName == "employee")
+    assert(employeeType.getNamespace == null)
+
+    val addressType = employeeType.getField("address").schema()
+    assert(addressType.getFullName == "employee.address")
+    assert(addressType.getName == "address")
+    assert(addressType.getNamespace == "employee")
+  }
+
   case class NestedMiddleArray(id: Int, data: Array[NestedBottom])
 
   case class NestedTopArray(id: Int, data: NestedMiddleArray)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message