crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: read an org.apache.avro.mapreduce.KeyValuePair datafile
Date Fri, 04 Jul 2014 13:31:19 GMT
Hi Barrie,

Inlined below

On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
> Hi,
>
> I’m stuck trying to read an avro KeyValuePair datafile with crunch.
>
> This is a header dump of my avro data file:
>
> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
> key/value pair","fields":[{"name":"key","type":"string","doc":"The
> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record",
> etc etc
>
> I’m only interested in the LiveTrackingLine object but I probably need
> to read the whole KeyValuePair object and extract the LiveTrackingLine
> in the crunch pipeline.
>
> This is the code I have so far.
>
> String inputPath = args[0];
> String outputPath = args[1];
> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
> ExtractViewsJob.class.getSimpleName(), getConf());
> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines =
> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String,
> LiveTrackingLine>>>(inputPath));
>
> I’m a bit lost in the last part where I configure the 'pipeline'
> object with the right avro schema(s) and input dir.

The easiest basic case here if you just want to read the data is to do
it as follows:

  PCollection<GenericData.Record> lines =
pipeline.read(From.avroFile(inputPath));

However, as you can see, this will give you a PCollection of
GenericData.Record objects.

> Because my schema is very complex I
> want to parse this as a ‘specific’ and not as a ‘generic’ or
> ‘reflective’ avro representation, this is also a learning experience
> in using avro with crunch.

This is not really currently officially supported in Crunch (although
there is a way to work around it). Because your top-level record type
is a generic record, you need to kind of skip around some of the basic
convenience methods of Crunch to get your specific records read as
actual
specific records.

The workaround you can use it to do something like this to read in
your PCollection:

  Schema specificRecordSchema = ...;
  AvroType<GenericData.Record> customAvroType = new
AvroType<GenericData.Record>(
    GenericData.Record.class, specificRecordSchema,
    NoOpDeepCopier.<GenericData.Record>create(),
    Avros.specifics(MySpecificRecord.class));

  PCollection<GenericData.Record> pcollection =
pipeline.read(From.avroFile(inputPath, customAvroType));

This will give you a PCollection of the GenericRecords that contain
instances of your specific class. However, be warned that this is
making use of what could be considered non-public APIs that could
change in the future.

Is this (reading Avro files that have been serialized using
AvroKeyValue) a case that you will be using regularly? It might be
interesting to add this as built-in functionality in Crunch so you
don't need to mess around with creating your own AvroType.

- Gabriel

Mime
View raw message