avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Pouttu-Clarke <Matt.Pouttu-Cla...@icrossing.com>
Subject Can we get a Hadoop record reader with GenericDatumReader support?
Date Thu, 22 Sep 2011 18:23:25 GMT
Hi All,

On our project we use the schema evolution features of Avro, and we also
need to select data without knowing the Schema(s) a priori.  In other words
we have a large number of Avro files with an evolving schema, and we may
project with any historical schema, or not project any schema at all.  In
the case of not being able to specify a specific schema, we could not find a
RecordReader which supports GenericDatumReader.  Since we are using
Cascading.Avro to hide the Avro plumbing, we created a patch to
Cascading.Avro, providing a generic record reader.   This is obviously a
temporary fix as we would prefer that this support existed within Avro
itself.  

Can anyone suggest a more elegant solution, source code below...

Cheers,
Matt

    public class CascadingAvroRecordReader<T>
        implements RecordReader<AvroWrapper<T>, NullWritable>
    {

        private FsInput in;
        private DataFileReader<T> reader;
        private long start;
        private long end;

        public CascadingAvroRecordReader(JobConf job, FileSplit split)
        throws IOException
        {
            this.in = new FsInput(split.getPath(), job);
            this.reader = new DataFileReader<T>(in, new
GenericDatumReader<T>());
            reader.sync(split.getStart());                    // sync to
start
            this.start = in.tell();
            this.end = split.getStart() + split.getLength();
        }

        public AvroWrapper<T> createKey() {
            return new AvroWrapper<T>(null);
        }
        public NullWritable createValue() { return NullWritable.get(); }
        public boolean next(AvroWrapper<T> wrapper, NullWritable ignore)
        throws IOException {
            if (!reader.hasNext() || reader.pastSync(end))
                return false;
            wrapper.datum(reader.next(wrapper.datum()));
            return true;
        }
        public float getProgress() throws IOException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.tell() - start) / (float)(end -
start));
            }
        }
        public long getPos() throws IOException {
            return in.tell();
        }
        public void close() throws IOException { reader.close(); }

    }

iCrossing Privileged and Confidential Information
This email message is for the sole use of the intended recipient(s) and may contain confidential
and privileged information of iCrossing. Any unauthorized review, use, disclosure or distribution
is prohibited. If you are not the intended recipient, please contact the sender by reply email
and destroy all copies of the original message.



Mime
View raw message