flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alam, Zeeshan" <Zeeshan.A...@fmr.com>
Subject What is the recommended way to read AVRO data from Kafka using flink.
Date Tue, 02 Aug 2016 14:04:45 GMT

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having
the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
you have mentioned that using the GenericData.Record type is possible with Flink, but not
recommended. Since the record contains the full schema, its very data intensive and thus probably
slow to use. So what is the recommended way to read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
              properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
              properties.setProperty("group.id", "Zeeshantest");
              AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class);
              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic",
avroSchema, properties);
              DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer);
              env.execute("Flink AVRO KAFKA Test");

This is the AvroDeserializationSchema that I am using.

public class AvroDeserializationSchema<T> implements DeserializationSchema<T>

       private static final long serialVersionUID = 4330538776656642778L;

       private final Class<T> avroType;
       private transient DatumReader<T> reader;
       private transient BinaryDecoder decoder;

       public AvroDeserializationSchema(Class<T> avroType) {
              this.avroType = avroType;

       public T deserialize(byte[] message) {
              try {
                     decoder = DecoderFactory.get().binaryDecoder(message, decoder);
                     return reader.read(null, decoder);
              } catch (Exception e) {
                     throw new RuntimeException(e);

       public boolean isEndOfStream(T nextElement) {
              return false;

       public TypeInformation<T> getProducedType() {
              return TypeExtractor.getForClass(avroType);

       private void ensureInitialized() {
              if (reader == null) {
                     if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType))
                           reader = new SpecificDatumReader<T>(avroType);
                     } else {
                           reader = new ReflectDatumReader<T>(avroType);

On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam
[cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  [cid:image003.jpg@01CFC2B0.B0315750]
 +91 7259501608
Fidelity Internal Information<http://fnw.fmr.com/issg/Popi_def-ex.html#internal>

Techworks Monitoring link<https://techworks.fmr.com/products/monitoring-overview>

View raw message