avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben Vogan <...@shopkick.com>
Subject NullPointerException trying to write a union of null & enum
Date Thu, 19 Jan 2017 16:25:30 GMT
Hi all,

I am trying to write a component in spark that will serialize a DataFrame
to Avro and load it into Kafka.  I have been using code from spark-avro (
https://github.com/databricks/spark-avro/blob/branch-3.1/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala#L82)
to do the translation, but I have run into a problem with enums.
Spark-Avro translates avro enums into strings when reading Avro, but does
not handle translating strings to enums when writing Avro.

In my code, before calling into spark-avro I check for the StringType and
create a converter:

def getEnumType(avroSchema: Schema, fieldName: String) : Option[Schema] = {
  val field = avroSchema.getField(fieldName)
  val fieldSchema = field.schema()
  if(fieldSchema.getType == Schema.Type.UNION) {
    for (t: Schema <- fieldSchema.getTypes) {
      if (t.getType.equals(Schema.Type.ENUM)) {
        return Some(t)
      }
    }
  } else if(fieldSchema.getType == Schema.Type.ENUM) {
    return Some(fieldSchema)
  }
  return None
}

val fieldConverters = sparkSchema.fields.map(field => {
  if(field.dataType == StringType) {
    // Try to auto convert strings to enums
    val enumType = getEnumType(avroSchema, field.name)
    if(enumType.isDefined) {
      // Return a function that translates the string into the enum
      (item: Any) => {
        if(item == null) {
          null
        } else {
          new GenericData.EnumSymbol(enumType.get, item.toString)
        }
      }
    } else {
      // Not an enum so just use the default string converter
      AvroSchemaConverter.createConverterToAvro(field.dataType,
field.name, avroSchema.getNamespace)
    }
  } else {
    AvroSchemaConverter.createConverterToAvro(field.dataType,
field.name, avroSchema.getNamespace)
  }
})

This works for some cases, but not for others and I do not understand the
difference.  When it fails I get the following stack, which seems to
indicate that Avro failed to resolve the union of "null" and the enum to
the null type, and then tried to write out the null using the enum schema:

Caused by: java.lang.NullPointerException: null of
com.shopkick.data.user_registration_type_enum of union in field
user_registration_type of com.shopkick.data.EventLog
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:132)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:126)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:87)
at io.confluent.kafka.serializers.KafkaAvroEncoder.toBytes(KafkaAvroEncoder.java:47)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:120)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:117)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:50)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.Schema$EnumSchema.getEnumOrdinal(Schema.java:749)
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:163)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:106)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:112)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
... 22 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter.writeToKafka(RDDKafkaWriter.scala:46)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader.load(KafkaAvroLoader.scala:151)
at com.shopkick.data.pipeline.loader.TLoader.transform(TLoader.scala:22)
at com.shopkick.data.pipeline.transformer.CompositeTransformer$$anonfun$transform$1.apply(CompositeTransformer.scala:86)
at com.shopkick.data.pipeline.transformer.CompositeTransformer$$anonfun$transform$1.apply(CompositeTransformer.scala:85)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:60)
at scala.collection.mutable.MutableList.foreach(MutableList.scala:30)
at com.shopkick.data.pipeline.transformer.CompositeTransformer.transform(CompositeTransformer.scala:85)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anonfun$stepPipeline$4.apply(PipelineMonitor.scala:338)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anonfun$stepPipeline$4.apply(PipelineMonitor.scala:318)
at com.memsql.spark.interface.DefaultPipelineMonitor.runPhase(PipelineMonitor.scala:410)
at com.memsql.spark.interface.DefaultPipelineMonitor.stepPipeline(PipelineMonitor.scala:318)
at com.memsql.spark.interface.DefaultPipelineMonitor.runPipeline(PipelineMonitor.scala:175)
at com.memsql.spark.interface.DefaultPipelineMonitor$$anon$1.run(PipelineMonitor.scala:129)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error serializing Avro message
Caused by: java.lang.NullPointerException: null of
com.shopkick.data.user_registration_type_enum of union in field
user_registration_type of com.shopkick.data.EventLog
at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:132)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:126)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:87)
at io.confluent.kafka.serializers.KafkaAvroEncoder.toBytes(KafkaAvroEncoder.java:47)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:120)
at com.shopkick.data.pipeline.loader.KafkaAvroLoader$$anonfun$3$$anonfun$apply$1.apply(KafkaAvroLoader.scala:117)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:50)
at com.github.benfradet.spark.kafka010.writer.RDDKafkaWriter$$anonfun$writeToKafka$1.apply(RDDKafkaWriter.scala:46)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.avro.Schema$EnumSchema.getEnumOrdinal(Schema.java:749)
at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:163)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:106)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:112)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153)
at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143)
at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105)
... 22 more


The enum is defined as:

{
   "name":"user_registration_type",
   "type":[
      "null",
      {
         "type":"enum",
         "name":"user_registration_type_enum",
         "symbols":[
            "cold",
            "warm",
            "paid"
         ]
      }
   ],
   "doc":"An enumeration for segmenting users by their registration type.
E.g. hot, warm, cold.",
   "default":null
}

Any insight would be greatly appreciated.  Thanks,

-- 
<http://shopkick.com/>
*BENJAMIN VOGAN* | Data Platform Team Lead
shopkick <http://www.shopkick.com/>
<http://facebook.com/shopkick> <http://instagram.com/shopkick>
<http://pinterest.com/shopkick> <http://twitter.com/shopkick>
<https://www.linkedin.com/company/831240?trk=tyah&trkInfo=clickedVertical%3Acompany%2CentityType%3AentityHistoryName%2CclickedEntityId%3Acompany_831240%2Cidx%3A0>

The indispensable app that rewards you for shopping.

Mime
View raw message