From "Alam, Zeeshan" <Zeeshan.A...@fmr.com>
Subject RE: What is the recommended way to read AVRO data from Kafka using flink.
Date Tue, 02 Aug 2016 14:53:19 GMT
Hi Stephan,

I went through one of the old mail thread http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E

Here it is mentioned that  When reading from Kafka you are expected to define a DeserializationSchema.
There is no out of the box (de)serializer for Flink with Kafka, but it should be not very
hard to add.

I have some questions:

1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I need to use
it instead of DatumReader in my  DeserializationSchema which is required to read data from

2.  What is the recommended way to read AVRO binary data from Kafka if I  have the AVRO schema
file [*.avsc ] with me? Is there a better more efficient approach?

3.       Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must
to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it.

Thanks & Regards,
Zeeshan Alam

From: Stephan Ewen [mailto:sewen@apache.org]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using flink.


I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:

You could try and use the latest release candidate to get the fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <Zeeshan.Alam@fmr.com<mailto:Zeeshan.Alam@fmr.com>>

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having
the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
you have mentioned that using the GenericData.Record type is possible with Flink, but not
recommended. Since the record contains the full schema, its very data intensive and thus probably
slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
              properties.setProperty("group.id<http://group.id>", "Zeeshantest");
              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);
              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic",
avroSchema, properties);
              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);
              env.execute("Flink AVRO KAFKA Test");

This is the AvroDeserializationSchema that I am using.

public class AvroDeserializationSchema<T> implements DeserializationSchema<T>

       private static final long serialVersionUID = 4330538776656642778L;

       private final Class<T> avroType;
       private transient DatumReader<T> reader;
       private transient BinaryDecoder decoder;

       public AvroDeserializationSchema(Class<T> avroType) {
              this.avroType = avroType;

       public T deserialize(byte[] message) {
              try {
                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);
                     return reader.read(null, decoder);
              } catch (Exception e) {
                     throw new RuntimeException(e);

       public boolean isEndOfStream(T nextElement) {
              return false;

       public TypeInformation<T> getProducedType() {
              return TypeExtractor.getForClass(avroType);

       private void ensureInitialized() {
              if (reader == null) {
                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType))
                           reader = new SpecificDatumReader<T>(avroType);
                     } else {
                           reader = new ReflectDatumReader<T>(avroType);

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam

