flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nitish Pant <nitishpant...@gmail.com>
Subject Re: Schema registry deserialization: Kryo NPE error
Date Mon, 02 Mar 2020 10:01:17 GMT

So I am building a data pipeline that takes input from sensors via MQTT broker and passes
it to kafka. Before it goes to kafka, I am filtering and serializing the filtered data into
avro format and keeping the schema in the registry. Now I want to get that data in flink to
process it using some algorithms. So, at the flinkKafkaConsumer end, I currently don’t have
the schemas for my data. One work around for me would be to get the schema corresponding the
data that I’ll be getting from a topic separately from the registry and then work forward,
but I was hoping there would a way to avoid this and integrate the schema registry with my
consumer in some way like kafka-connect does. This is why I was trying this solution.

Do you think I should maybe do the work around method as implementing a GenericRecord would
be more of a overhead in the longer run?


> On 02-Mar-2020, at 3:11 PM, Arvid Heise <arvid@ververica.com> wrote:
> Could you please give more background on your use case? It's hard to give any advice
with the little information you gave us.
> Usually, the consumer should know the schema or else it's hard to do meaningful processing.
> If it's something completely generic, then there is no way around it, but that should
be the last straw. Here my recommendations from my first response would come into play.
> If they are not working for you for some reason, please let me know why and I could come
up with a solution.
> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant123@gmail.com <mailto:nitishpant123@gmail.com>>
> Hi,
> Thanks for the replies. I get that it is not wise to use GenericRecord and that is what
is causing the Kryo fallback, but then if not this, how should I go about writing a AvroSchemaRegistrySchema
for when I don’t know the schema. Without the knowledge of schema, I can’t create a class.
Can you suggest a way of getting around that?
> Thanks!
>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakowicz@apache.org <mailto:dwysakowicz@apache.org>>
>> Hi Nitish,
>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer comes
from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is
not a pojo this call will produce a GenericTypeInfo which uses Kryo serialization.
>> For a reference example I would recommend having a look at AvroDeserializationSchema.
There we use GenericRecordAvroTypeInfo for working with GenericRecords. One important note.
GenericRecords are not the best candidates for a data objects in Flink. The reason is if you
apply any transformation on a GenericRecord e.g. map/flatMap. The input type information cannot
be forwarded as the transformation is a black box from Flink's perspective. Therefore you
would need to provide the type information for every step of the pipeline:
>> TypeInformation<?> info = ...
>> sEnv.addSource(...) // produces info
>> .map(...)
>> .returns(info) // must be provided again, as the map transformation is a black box,
the transformation might produce a completely different record
>> Hope that helps a bit.
>> Best,
>> Dawid
>> On 02/03/2020 09:04, Arvid Heise wrote:
>>> Hi Nitish,
>>> Kryo is the fallback serializer of Flink when everything else fails. In general,
performance suffers quite a bit and it's not always applicable as in your case. Especially,
in production code, it's best to avoid it completely.
>>> In your case, the issue is that your provided type information is completely
meaningless. getProducedType is not providing any actual type information but just references
to a generic skeleton. Flink uses the type information to reason about the value structures,
which it cannot in your case.
>>> If you really need to resort to a completely generic serializer (which is usually
not needed), then you have a few options:
>>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that generic
you probably have only a simple transformation before outputting it into some generic Kafka
sink. So your UDF deserializes, does some generic stuff, and immediately turns it back into
>>> * Implement your own generic TypeInformation with serializer. WritableTypeInfo
[1] is a generic example on how to do it. This will automatically convert byte[] back and
forth to GenericRecord. That would be the recommended way when you have multiple transformations
before source and sink.
>>> [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant123@gmail.com <mailto:nitishpant123@gmail.com>>
>>> Hi all,
>>> I am trying to work with flink to get avro data from kafka for which the schemas
are stored in kafka schema registry. Since, the producer for kafka is a totally different
service(an MQTT consumer sinked to kafka), I can’t have the schema with me at the consumer
end. I read around and diverged to the following implementation of KeyedDeserializationSchema
but I cannot understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException`
>>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends KeyedDeserializationSchema[GenericRecord]
>>>   // Flink needs the serializer to be serializable => this "@transient lazy
val" does the trick
>>>   @transient lazy val valueDeserializer = {
>>>         val deserializer = new KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl,
>>>         deserializer.configure(
>>>           Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
>>>         KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> false).asJava,
>>>           false)
>>>         deserializer
>>>   }
>>>   override def isEndOfStream(nextElement: GenericRecord): Boolean = false
>>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>>>         topic: String, partition: Int, offset: Long): GenericRecord = {
>>>          // val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String]
>>>           val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]
>>>       value
>>>   }
>>>   override def getProducedType: TypeInformation[GenericRecord] =
>>>     TypeExtractor.getForClass(classOf[GenericRecord])
>>> }
>>> I have no clue how to go about solving this. I saw a lot of people trying to
implement the same. If someone can guide me, it’d be really helpful.
>>> Thanks!
>>> Nitish

View raw message