spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] xkrogen commented on a change in pull request #32969: [SPARK-35817][SQL] Restore performance of queries against wide Avro tables
Date Tue, 22 Jun 2021 07:41:10 GMT

xkrogen commented on a change in pull request #32969:
URL: https://github.com/apache/spark/pull/32969#discussion_r655548950



##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
     }
   }
 
-  /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to [[SQLConf.resolver]].
-   *
-   * @param avroSchema The schema in which to search for the field. Must be of type RECORD.
-   * @param name The name of the field to search for.
-   * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or contains multiple
-   *                                     fields matching `name` (i.e., case-insensitive matching
-   *                                     is used and `avroSchema` has two or more fields
that have
-   *                                     the same name with difference case).
-   */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
     if (avroSchema.getType != Schema.Type.RECORD) {
       throw new IncompatibleSchemaException(
         s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: ${avroSchema.getType}")
     }
-    avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), name)).toSeq
match {
-      case Seq(avroField) => Some(avroField)
-      case Seq() => None
-      case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in
Avro " +
+
+    val schemaMap = avroSchema.getFields.asScala.groupBy { f =>

Review comment:
       minor nit: `private[this] val`
   also maybe `fieldMap` instead of `schemaMap` ?

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
##########
@@ -250,11 +250,12 @@ private[sql] class AvroSerializer(
         s"Avro $avroPathStr schema length (${avroFields.size}) doesn't match " +
         s"SQL ${toFieldStr(catalystPath)} schema length (${catalystStruct.length})")
     }
+    val avroSchemaHelper = new AvroUtils.AvroSchemaHelper(avroStruct, avroPath)
 
     val (avroIndices: Array[Int], fieldConverters: Array[Converter]) =
       catalystStruct.map { catalystField =>
-        val avroField = AvroUtils
-            .getAvroFieldByName(avroStruct, catalystField.name, avroPath) match {
+        val avroField = avroSchemaHelper
+            .getFieldByName(catalystField.name) match {

Review comment:
       minor nit: you can combine these two lines back into 1 now that the method call is
shorter

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
     }
   }
 
-  /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to [[SQLConf.resolver]].
-   *
-   * @param avroSchema The schema in which to search for the field. Must be of type RECORD.
-   * @param name The name of the field to search for.
-   * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or contains multiple
-   *                                     fields matching `name` (i.e., case-insensitive matching
-   *                                     is used and `avroSchema` has two or more fields
that have
-   *                                     the same name with difference case).
-   */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {

Review comment:
       can you keep the Scaladoc for `avroSchema` and `avroPath` from the previous version?

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
     }
   }
 
-  /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to [[SQLConf.resolver]].
-   *
-   * @param avroSchema The schema in which to search for the field. Must be of type RECORD.
-   * @param name The name of the field to search for.
-   * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or contains multiple
-   *                                     fields matching `name` (i.e., case-insensitive matching
-   *                                     is used and `avroSchema` has two or more fields
that have
-   *                                     the same name with difference case).
-   */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
     if (avroSchema.getType != Schema.Type.RECORD) {
       throw new IncompatibleSchemaException(
         s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: ${avroSchema.getType}")
     }
-    avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), name)).toSeq
match {
-      case Seq(avroField) => Some(avroField)
-      case Seq() => None
-      case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in
Avro " +
+
+    val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+        f.name.toLowerCase(Locale.ROOT)
+      }.map { case (k, v) =>
+      (k, v.toSeq) // needed for scala 2.13
+    }
+
+    /**
+     * Extract a single field from the contained avro schema which has the desired field
name,
+     * performing the matching with proper case sensitivity according to SQLConf.resolver.
+     *
+     * @param name The name of the field to search for.
+     * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
+     */
+    def getFieldByName(name: String): Option[Schema.Field] = {
+
+      // get candidates, ignoring case of field name
+      val candidates = schemaMap.get(name.toLowerCase(Locale.ROOT))
+        .getOrElse(Seq.empty[Schema.Field])
+
+      // search candidates, taking into account case sensitivity settings
+      candidates.filter(f => SQLConf.get.resolver(f.name(), name)) match {
+        case Seq(avroField) => Some(avroField)

Review comment:
       Can you provide any commentary on why you chose to always lowercase and then resolve
later based on case sensitivity, vs. storing the map keys as case insensitive or sensitive
based on the SQL conf? Is it so that we can make use of `SQLConf#resolver` to perform the
resolution instead of checking the value of the case sensitive conf ourselves?

##########
File path: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
##########
@@ -202,34 +203,40 @@ private[sql] object AvroUtils extends Logging {
     }
   }
 
-  /**
-   * Extract a single field from `avroSchema` which has the desired field name,
-   * performing the matching with proper case sensitivity according to [[SQLConf.resolver]].
-   *
-   * @param avroSchema The schema in which to search for the field. Must be of type RECORD.
-   * @param name The name of the field to search for.
-   * @param avroPath The seq of parent field names leading to `avroSchema`.
-   * @return `Some(match)` if a matching Avro field is found, otherwise `None`.
-   * @throws IncompatibleSchemaException if `avroSchema` is not a RECORD or contains multiple
-   *                                     fields matching `name` (i.e., case-insensitive matching
-   *                                     is used and `avroSchema` has two or more fields
that have
-   *                                     the same name with difference case).
-   */
-  private[avro] def getAvroFieldByName(
-      avroSchema: Schema,
-      name: String,
-      avroPath: Seq[String]): Option[Schema.Field] = {
+  class AvroSchemaHelper(avroSchema: Schema, avroPath: Seq[String]) {
     if (avroSchema.getType != Schema.Type.RECORD) {
       throw new IncompatibleSchemaException(
         s"Attempting to treat ${avroSchema.getName} as a RECORD, but it was: ${avroSchema.getType}")
     }
-    avroSchema.getFields.asScala.filter(f => SQLConf.get.resolver(f.name(), name)).toSeq
match {
-      case Seq(avroField) => Some(avroField)
-      case Seq() => None
-      case matches => throw new IncompatibleSchemaException(s"Searching for '$name' in
Avro " +
+
+    val schemaMap = avroSchema.getFields.asScala.groupBy { f =>
+        f.name.toLowerCase(Locale.ROOT)
+      }.map { case (k, v) =>
+      (k, v.toSeq) // needed for scala 2.13
+    }

Review comment:
       Formatting here looks a little odd to me, maybe we can do:
   ```
   avroSchema.getFields.asScala
     .groupBy(_.name.toLowerCase(Locale.ROOT))
     .mapValues(_.toSeq) // toSeq needed for scala 2.13
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message