avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maulik Gandhi <mmg...@gmail.com>
Subject Re: Parsing avro binary data from Spark Streaming
Date Fri, 25 Sep 2015 17:40:59 GMT
Hi Daniel,


Below code snippet should help

public SpecificRecord fromBytes(final byte[] bytes, final
Class<SpecificRecord> clazz) {
        final BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(bytes, 0, bytes.length, null);
        final DatumReader<SpecificRecord> datumReader = new
SpecificDatumReader<SpecificRecord>(clazz);
        try {
            final Method newBuilder = clazz.getMethod("newBuilder", clazz);
            return ((SpecificRecordBuilderBase<?>)
newBuilder.invoke(null, datumReader.read(null, decoder))).build();
        } catch (final IllegalArgumentException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final IllegalAccessException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final InvocationTargetException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final IOException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final SecurityException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final NoSuchMethodException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        }
    }


Thanks.

- Maulik


On Fri, Sep 25, 2015 at 11:18 AM, Daniel Haviv <
daniel.haviv@veracity-group.com> wrote:

> Hi,
> I'm receiving avro data from Kafka in my Spark Streaming app.
> When reading the data directly from disk I would have just used the
> following manner to parse it :
> val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
> AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10)
> val txtRDD = avroRDD.map(l => {l._1.datum.toString} )
>
> I would like to do the same with avro data coming in from kafka, so I'm
> doing the following:
> val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
>
> This leaves me with a byte array and I can't find any example on how to
> convert a byte array to either a GenericRecord or to my avro class.
>
> Any help will be appreciated.
>
> Daniel
>

Mime
View raw message