crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From B Kersbergen <kersberg...@gmail.com>
Subject Re: read an org.apache.avro.mapreduce.KeyValuePair datafile
Date Fri, 04 Jul 2014 15:32:28 GMT
Configuring the keyvalue schema makes sense (when seeing it now :-) )
The job runs and it's reading my data.
Thanks for your detailed support.
Barrie




2014-07-04 17:14 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
> Hi Barrie,
>
> It's due to a dumb error on my part, sorry about that. The schema that
> you pass in to the AvroType constructor needs to be the schema for the
> AvroKeyValue class.
>
> So instead of:
>
>     Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
>
> it should be:
>
>     Schema schema =
> AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING),
> LiveTrackingLine.SCHEMA$);
>
> Supplying that schema to the AvroType constructor should get you
> around the error you're running into now.
>
> - Gabriel
>
>
> On Fri, Jul 4, 2014 at 4:30 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
>> Hi Gabriel,
>>
>> Yes all our avro data is wrapped in the Avro KeyValuePair because
>> map-reduce jobs like to reason on keys and values.
>>
>> This is now my code:
>> String inputPath = args[0];
>>         String outputPath = args[1];
>>
>>         Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>> ExtractViewsJob.class.getSimpleName(), getConf());
>>
>>         Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
>>         AvroType<GenericData.Record> customAvroType = new
>>                 AvroType<GenericData.Record>(
>>                 GenericData.Record.class, specificRecordSchema,
>>                 NoOpDeepCopier.<GenericData.Record>create(),
>>                 Avros.specifics(LiveTrackingLine.class));
>>         PCollection<GenericData.Record> pcollection =
>>                 pipeline.read(From.avroFile(inputPath, customAvroType));
>>
>>         pipeline.write(pcollection, new TextFileTarget(outputPath));
>>         return pipeline.done().succeeded() ? 0 : 1;
>>
>> when running this I get the following job error:
>>
>> org.apache.avro.AvroTypeException: Found
>> org.apache.avro.mapreduce.KeyValuePair, expecting
>> com.bol.hadoop.enrich.record.LiveTrackingLine
>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>> at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
>>
>> (I'm using crunch-core 0.10.0-hadoop2)
>>
>> Do you have any idea whats going wrong here?
>>
>> 2014-07-04 15:31 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
>>> Hi Barrie,
>>>
>>> Inlined below
>>>
>>> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
>>>> Hi,
>>>>
>>>> I’m stuck trying to read an avro KeyValuePair datafile with crunch.
>>>>
>>>> This is a header dump of my avro data file:
>>>>
>>>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
>>>> key/value pair","fields":[{"name":"key","type":"string","doc":"The
>>>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record",
>>>> etc etc
>>>>
>>>> I’m only interested in the LiveTrackingLine object but I probably need
>>>> to read the whole KeyValuePair object and extract the LiveTrackingLine
>>>> in the crunch pipeline.
>>>>
>>>> This is the code I have so far.
>>>>
>>>> String inputPath = args[0];
>>>> String outputPath = args[1];
>>>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>>>> ExtractViewsJob.class.getSimpleName(), getConf());
>>>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>>
lines =
>>>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String,
>>>> LiveTrackingLine>>>(inputPath));
>>>>
>>>> I’m a bit lost in the last part where I configure the 'pipeline'
>>>> object with the right avro schema(s) and input dir.
>>>
>>> The easiest basic case here if you just want to read the data is to do
>>> it as follows:
>>>
>>>   PCollection<GenericData.Record> lines =
>>> pipeline.read(From.avroFile(inputPath));
>>>
>>> However, as you can see, this will give you a PCollection of
>>> GenericData.Record objects.
>>>
>>>> Because my schema is very complex I
>>>> want to parse this as a ‘specific’ and not as a ‘generic’ or
>>>> ‘reflective’ avro representation, this is also a learning experience
>>>> in using avro with crunch.
>>>
>>> This is not really currently officially supported in Crunch (although
>>> there is a way to work around it). Because your top-level record type
>>> is a generic record, you need to kind of skip around some of the basic
>>> convenience methods of Crunch to get your specific records read as
>>> actual
>>> specific records.
>>>
>>> The workaround you can use it to do something like this to read in
>>> your PCollection:
>>>
>>>   Schema specificRecordSchema = ...;
>>>   AvroType<GenericData.Record> customAvroType = new
>>> AvroType<GenericData.Record>(
>>>     GenericData.Record.class, specificRecordSchema,
>>>     NoOpDeepCopier.<GenericData.Record>create(),
>>>     Avros.specifics(MySpecificRecord.class));
>>>
>>>   PCollection<GenericData.Record> pcollection =
>>> pipeline.read(From.avroFile(inputPath, customAvroType));
>>>
>>> This will give you a PCollection of the GenericRecords that contain
>>> instances of your specific class. However, be warned that this is
>>> making use of what could be considered non-public APIs that could
>>> change in the future.
>>>
>>> Is this (reading Avro files that have been serialized using
>>> AvroKeyValue) a case that you will be using regularly? It might be
>>> interesting to add this as built-in functionality in Crunch so you
>>> don't need to mess around with creating your own AvroType.
>>>
>>> - Gabriel

Mime
View raw message