flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: What is the recommended way to read AVRO data from Kafka using flink.
Date Tue, 02 Aug 2016 14:22:02 GMT
Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink
1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bPLU+xVPWqrWd3c8yNUk3iWK9aQvGRC3Ag@mail.gmail.com%3E

You could try and use the latest release candidate to get the fix:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <Zeeshan.Alam@fmr.com> wrote:

> Hi,
>
>
>
> I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data
> from flink. I am having the* AVRO schema file* with me which was used to
> write data in Kafka. Here
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
> you have mentioned that using the GenericData.Record type is possible with
> Flink, but not recommended. Since the record contains the full schema, its
> very data intensive and thus probably slow to use. So what is the
> recommended way to read AVRO data from Kafka using flink.
>
>
>
> *public* *static* *void* main(String[] args) *throws* Exception {
>
>               StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *getExecutionEnvironment*();
>
>               Properties properties = *new* Properties();
>
>               properties.setProperty("bootstrap.servers",
> "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
>
>               properties.setProperty("zookeeper.connect",
> "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
>
>               properties.setProperty("group.id", "Zeeshantest");
>
>               AvroDeserializationSchema<GenericData.Record> avroSchema =
> *new* AvroDeserializationSchema<>(GenericData.Record.*class*);
>
>               FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer =
> *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
>
>               DataStream<GenericData.Record> messageStream = env
> .addSource(kafkaConsumer);
>
>               messageStream.rebalance().print();
>
>               env.execute("Flink AVRO KAFKA Test");
>
>        }
>
>
>
> This is the *AvroDeserializationSchema* that I am using.
>
>
>
>
>
> *public* *class* AvroDeserializationSchema<T> *implements*
> DeserializationSchema<T> {
>
>
>
>        *private* *static* *final* *long* *serialVersionUID* =
> 4330538776656642778L;
>
>
>
>        *private* *final* Class<T> avroType;
>
>        *private* *transient* DatumReader<T> reader;
>
>        *private* *transient* BinaryDecoder decoder;
>
>
>
>        *public* AvroDeserializationSchema(Class<T> avroType) {
>
>               *this*.avroType = avroType;
>
>        }
>
>
>
>        @Override
>
>        *public* T deserialize(*byte*[] message) {
>
>               ensureInitialized();
>
>               *try* {
>
>                      decoder = DecoderFactory.*get*().binaryDecoder(
> message, decoder);
>
>                      *return* reader.read(*null*, decoder);
>
>               } *catch* (Exception e) {
>
>                      *throw* *new* RuntimeException(e);
>
>               }
>
>        }
>
>
>
>        @Override
>
>        *public* *boolean* isEndOfStream(T nextElement) {
>
>               *return* *false*;
>
>        }
>
>
>
>        @Override
>
>        *public* TypeInformation<T> getProducedType() {
>
>               *return* TypeExtractor.*getForClass*(avroType);
>
>        }
>
>
>
>        *private* *void* ensureInitialized() {
>
>               *if* (reader == *null*) {
>
>                      *if* (org.apache.avro.specific.SpecificRecordBase.
> *class*.isAssignableFrom(avroType)) {
>
>                            reader = *new* SpecificDatumReader<T>(avroType
> );
>
>                      } *else* {
>
>                            reader = *new* ReflectDatumReader<T>(avroType);
>
>                      }
>
>               }
>
>        }
>
> }
>
>
>
> On running this I am getting *java.lang.Exception*: Not a Specific class:
> class org.apache.avro.generic.GenericData$Record.
>
>
>
> *Thanks & Regards*
>
> *Zeeshan Alam *
>
> [image: cid:image001.jpg@01CFC06C.80406AE0]
>
> *[image: cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  [image:
> cid:image003.jpg@01CFC2B0.B0315750] +91 7259501608 <%2B91%207259501608>*
>
> *Fidelity Internal Information*
> <http://fnw.fmr.com/issg/Popi_def-ex.html#internal>
>
>
>
> *Techworks Monitoring link*
> <https://techworks.fmr.com/products/monitoring-overview>
>
>
>
>
>

Mime
View raw message