Return-Path: X-Original-To: apmail-avro-user-archive@www.apache.org Delivered-To: apmail-avro-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 68A4217A00 for ; Fri, 25 Sep 2015 17:41:54 +0000 (UTC) Received: (qmail 78860 invoked by uid 500); 25 Sep 2015 17:41:53 -0000 Delivered-To: apmail-avro-user-archive@avro.apache.org Received: (qmail 78790 invoked by uid 500); 25 Sep 2015 17:41:53 -0000 Mailing-List: contact user-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@avro.apache.org Delivered-To: mailing list user@avro.apache.org Received: (qmail 78780 invoked by uid 99); 25 Sep 2015 17:41:53 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2015 17:41:53 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 57EC9180A28 for ; Fri, 25 Sep 2015 17:41:53 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 3.13 X-Spam-Level: *** X-Spam-Status: No, score=3.13 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id YjCzlrV3-mCt for ; Fri, 25 Sep 2015 17:41:46 +0000 (UTC) Received: from mail-yk0-f179.google.com (mail-yk0-f179.google.com [209.85.160.179]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 02EEA20F9F for ; Fri, 25 Sep 2015 17:41:46 +0000 (UTC) Received: by ykft14 with SMTP id t14so122049690ykf.0 for ; Fri, 25 Sep 2015 10:41:39 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-type; bh=Ra018MvTewRK1j4vjyQqYuBbS4GYujhYnUd/syqwA28=; b=K2Q7mH/BLoojf7h2TEkNZJhx4m2psUteznynbO5EmO8BNSP3EtAm2iSwng0nJtJqIL 5sz0xoID4JNzTs5d+JZCbzS8DgAItoSiwrvUuoYStZ5jVta8/TnqD2EYTrXNBYuEqFYk R6brSx+RT8JvUJHecjY0OLdgqKLxibZoeCsEf7pz/RJrWMBuDzI4FhEokNRgj3IdmAUg glxwWK70dOYW1P63lTwRIEvtt5Ashc1smnwoitEbmip1Jg6PjiYGM47wEzqX62WbgE0Y wvrc3hNgcTYujvdIeX+0qSQNvWcciD1IVGlo+JXJE9kUw287nmZqQ8zZYtqXLTg+QzQj Vv2w== X-Received: by 10.170.65.195 with SMTP id h186mr5815007ykh.107.1443202899209; Fri, 25 Sep 2015 10:41:39 -0700 (PDT) MIME-Version: 1.0 Received: by 10.37.39.215 with HTTP; Fri, 25 Sep 2015 10:40:59 -0700 (PDT) In-Reply-To: References: From: Maulik Gandhi Date: Fri, 25 Sep 2015 12:40:59 -0500 Message-ID: Subject: Re: Parsing avro binary data from Spark Streaming To: user Content-Type: multipart/alternative; boundary=001a11396a7a25747d052095dceb --001a11396a7a25747d052095dceb Content-Type: text/plain; charset=UTF-8 Hi Daniel, Below code snippet should help public SpecificRecord fromBytes(final byte[] bytes, final Class clazz) { final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, 0, bytes.length, null); final DatumReader datumReader = new SpecificDatumReader(clazz); try { final Method newBuilder = clazz.getMethod("newBuilder", clazz); return ((SpecificRecordBuilderBase) newBuilder.invoke(null, datumReader.read(null, decoder))).build(); } catch (final IllegalArgumentException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } catch (final IllegalAccessException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } catch (final InvocationTargetException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } catch (final IOException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } catch (final SecurityException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } catch (final NoSuchMethodException e) { throw new IllegalStateException("Unable to deserialize avro" + clazz, e); } } Thanks. - Maulik On Fri, Sep 25, 2015 at 11:18 AM, Daniel Haviv < daniel.haviv@veracity-group.com> wrote: > Hi, > I'm receiving avro data from Kafka in my Spark Streaming app. > When reading the data directly from disk I would have just used the > following manner to parse it : > val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, > AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10) > val txtRDD = avroRDD.map(l => {l._1.datum.toString} ) > > I would like to do the same with avro data coming in from kafka, so I'm > doing the following: > val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], > DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet) > > This leaves me with a byte array and I can't find any example on how to > convert a byte array to either a GenericRecord or to my avro class. > > Any help will be appreciated. > > Daniel > --001a11396a7a25747d052095dceb Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Daniel, 

Below code snipp=
et should help
public SpecificRecord fromBytes(final byte[] bytes=
, final Class<SpecificRecord> clazz) {
        final BinaryDecoder decoder =3D DecoderFactory.get().binaryDecoder(=
bytes, 0, bytes.length, null);
        final DatumReader<SpecificRecord> datumReader =3D new Specifi=
cDatumReader<SpecificRecord>(clazz);
        try {
            final Method newBuilder =3D clazz.getMethod("newBuilder&qu=
ot;, clazz);
            return ((SpecificRecordBuilderBase<?>) newBuilder.invoke(=
null, datumReader.read(null, decoder))).build();
        } catch (final IllegalArgumentException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        } catch (final IllegalAccessException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        } catch (final InvocationTargetException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        } catch (final IOException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        } catch (final SecurityException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        } catch (final NoSuchMethodException e) {
            throw new IllegalStateException("Unable to deserialize avr=
o" + clazz, e);
        }
    }

Thanks.
- Maulik

On Fri, Sep 25, 2015 at= 11:18 AM, Daniel Haviv <daniel.haviv@veracity-group.com= > wrote:
Hi,I'm receiving avro data from Kafka in my Spark Streaming app.
<= div>When reading the data directly from disk I would have just used the fol= lowing manner to parse it :
val avroRDD =3D sc.hadoopFile[AvroWra= pper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]]("/i= ncoming_1k").coalesce(10)
val txtRDD =3D avroRDD.map(l = =3D> {l._1.datum.toString} )

I would like t= o do the same with avro data coming in from kafka, so I'm doing the fol= lowing:
val avroStream =3D KafkaUtils.createDirectStream[Array[By= te], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSe= t)

This leaves me with a byte array and I can&= #39;t find any example on how to convert a byte array to either a GenericRe= cord or to my avro class.

Any help will be appreci= ated.

Daniel

--001a11396a7a25747d052095dceb--