flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Can't call getProducedType on Avro messages with array types
Date Thu, 04 Jan 2018 11:07:56 GMT
Hi,

I think you might be able to use AvroTypeInfo which you can use by including the flink-avro
dependencies. Is that an option for you?

Best,
Aljoscha

> On 3. Jan 2018, at 21:34, Kyle Hamlin <hamlin.kn@gmail.com> wrote:
> 
> Hi,
> 
> It appears that Kryo can't properly extract/deserialize Avro array types. I have a very
simple Avro schema that has an array type and when I remove the array field the error is not
thrown. Is there any way around this without using a specific type?
> 
> Avro Schema:
> {
>     "type": "record",
>     "name": "Recieved",
>     "fields": [
>         {"name": "id", "type": "int"},
>         {"name": "time", "type": "long"},
>         {"name": "urls", "type": {"type": "array", "items": "string"}},
>     ]
> }
> 
> Deserializer:
> import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroDeserializer}
> import org.apache.avro.generic.{GenericData, GenericRecord}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.api.java.typeutils.TypeExtractor
> import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
> 
> import scala.collection.JavaConverters._
> import scala.reflect.ClassTag
> 
> class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: String) extends
KeyedDeserializationSchema[T] {
> 
>   @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
>     val deserializer = new KafkaAvroDeserializer()
>     deserializer.configure(
>       Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
>       true)
>     deserializer
>   }
> 
>   // Flink needs the serializer to be serializable => this "@transient lazy val" does
the trick
>   @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
>     val deserializer = new KafkaAvroDeserializer()
>     deserializer.configure(
>       // other schema-registry configuration parameters can be passed, see the configure()
code
>       // for details (among other things, schema cache size)
>       Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
>       false)
>     deserializer
>   }
> 
>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>                            topic: String, partition: Int, offset: Long): T = {
>     valueDeserializer.deserialize(topic, message).asInstanceOf[T]
>   }
> 
>   override def isEndOfStream(nextElement: T): Boolean = false
> 
>   override def getProducedType: TypeInformation[T] = {
>     TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
>   }
> 
> }
> Stacktrace:
> Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
<http://127.0.0.1:6123/>
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082 <http://localhost:8082/>
> Starting execution of program
> Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id 00000000-0000-0000-0000-000000000000.
> 01/03/2018 15:19:57	Job execution switched to status RUNNING.
> 01/03/2018 15:19:57	Source: Kafka -> Sink: Unnamed(1/1) switched to SCHEDULED
> 01/03/2018 15:19:57	Source: Kafka -> Sink: Unnamed(1/1) switched to DEPLOYING
> 01/03/2018 15:19:57	Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
> 01/03/2018 15:19:59	Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> values (org.apache.avro.generic.GenericData$Record)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> 	at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> 	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> 	... 20 more
> 
> 01/03/2018 15:19:59	Job execution switched to status FAILING.


Mime
View raw message