crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: read an org.apache.avro.mapreduce.KeyValuePair datafile
Date Fri, 04 Jul 2014 15:14:50 GMT
Hi Barrie,

It's due to a dumb error on my part, sorry about that. The schema that
you pass in to the AvroType constructor needs to be the schema for the
AvroKeyValue class.

So instead of:

    Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;

it should be:

    Schema schema =
AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING),
LiveTrackingLine.SCHEMA$);

Supplying that schema to the AvroType constructor should get you
around the error you're running into now.

- Gabriel


On Fri, Jul 4, 2014 at 4:30 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
> Hi Gabriel,
>
> Yes all our avro data is wrapped in the Avro KeyValuePair because
> map-reduce jobs like to reason on keys and values.
>
> This is now my code:
> String inputPath = args[0];
>         String outputPath = args[1];
>
>         Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
> ExtractViewsJob.class.getSimpleName(), getConf());
>
>         Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
>         AvroType<GenericData.Record> customAvroType = new
>                 AvroType<GenericData.Record>(
>                 GenericData.Record.class, specificRecordSchema,
>                 NoOpDeepCopier.<GenericData.Record>create(),
>                 Avros.specifics(LiveTrackingLine.class));
>         PCollection<GenericData.Record> pcollection =
>                 pipeline.read(From.avroFile(inputPath, customAvroType));
>
>         pipeline.write(pcollection, new TextFileTarget(outputPath));
>         return pipeline.done().succeeded() ? 0 : 1;
>
> when running this I get the following job error:
>
> org.apache.avro.AvroTypeException: Found
> org.apache.avro.mapreduce.KeyValuePair, expecting
> com.bol.hadoop.enrich.record.LiveTrackingLine
> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
>
> (I'm using crunch-core 0.10.0-hadoop2)
>
> Do you have any idea whats going wrong here?
>
> 2014-07-04 15:31 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
>> Hi Barrie,
>>
>> Inlined below
>>
>> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
>>> Hi,
>>>
>>> I’m stuck trying to read an avro KeyValuePair datafile with crunch.
>>>
>>> This is a header dump of my avro data file:
>>>
>>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
>>> key/value pair","fields":[{"name":"key","type":"string","doc":"The
>>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record",
>>> etc etc
>>>
>>> I’m only interested in the LiveTrackingLine object but I probably need
>>> to read the whole KeyValuePair object and extract the LiveTrackingLine
>>> in the crunch pipeline.
>>>
>>> This is the code I have so far.
>>>
>>> String inputPath = args[0];
>>> String outputPath = args[1];
>>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>>> ExtractViewsJob.class.getSimpleName(), getConf());
>>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines
=
>>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String,
>>> LiveTrackingLine>>>(inputPath));
>>>
>>> I’m a bit lost in the last part where I configure the 'pipeline'
>>> object with the right avro schema(s) and input dir.
>>
>> The easiest basic case here if you just want to read the data is to do
>> it as follows:
>>
>>   PCollection<GenericData.Record> lines =
>> pipeline.read(From.avroFile(inputPath));
>>
>> However, as you can see, this will give you a PCollection of
>> GenericData.Record objects.
>>
>>> Because my schema is very complex I
>>> want to parse this as a ‘specific’ and not as a ‘generic’ or
>>> ‘reflective’ avro representation, this is also a learning experience
>>> in using avro with crunch.
>>
>> This is not really currently officially supported in Crunch (although
>> there is a way to work around it). Because your top-level record type
>> is a generic record, you need to kind of skip around some of the basic
>> convenience methods of Crunch to get your specific records read as
>> actual
>> specific records.
>>
>> The workaround you can use it to do something like this to read in
>> your PCollection:
>>
>>   Schema specificRecordSchema = ...;
>>   AvroType<GenericData.Record> customAvroType = new
>> AvroType<GenericData.Record>(
>>     GenericData.Record.class, specificRecordSchema,
>>     NoOpDeepCopier.<GenericData.Record>create(),
>>     Avros.specifics(MySpecificRecord.class));
>>
>>   PCollection<GenericData.Record> pcollection =
>> pipeline.read(From.avroFile(inputPath, customAvroType));
>>
>> This will give you a PCollection of the GenericRecords that contain
>> instances of your specific class. However, be warned that this is
>> making use of what could be considered non-public APIs that could
>> change in the future.
>>
>> Is this (reading Avro files that have been serialized using
>> AvroKeyValue) a case that you will be using regularly? It might be
>> interesting to add this as built-in functionality in Crunch so you
>> don't need to mess around with creating your own AvroType.
>>
>> - Gabriel

Mime
View raw message