crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From B Kersbergen <kersberg...@gmail.com>
Subject Re: read an org.apache.avro.mapreduce.KeyValuePair datafile
Date Fri, 04 Jul 2014 15:42:32 GMT
Super

2014-07-04 17:30 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
> I've added a Jira ticket
> (https://issues.apache.org/jira/browse/CRUNCH-433) to get better
> compatibility for this kind of thing directly in Crunch.
>
> On Fri, Jul 4, 2014 at 5:14 PM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
>> Hi Barrie,
>>
>> It's due to a dumb error on my part, sorry about that. The schema that
>> you pass in to the AvroType constructor needs to be the schema for the
>> AvroKeyValue class.
>>
>> So instead of:
>>
>>     Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
>>
>> it should be:
>>
>>     Schema schema =
>> AvroKeyValue.getSchema(Schema.create(Schema.Type.STRING),
>> LiveTrackingLine.SCHEMA$);
>>
>> Supplying that schema to the AvroType constructor should get you
>> around the error you're running into now.
>>
>> - Gabriel
>>
>>
>> On Fri, Jul 4, 2014 at 4:30 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
>>> Hi Gabriel,
>>>
>>> Yes all our avro data is wrapped in the Avro KeyValuePair because
>>> map-reduce jobs like to reason on keys and values.
>>>
>>> This is now my code:
>>> String inputPath = args[0];
>>>         String outputPath = args[1];
>>>
>>>         Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>>> ExtractViewsJob.class.getSimpleName(), getConf());
>>>
>>>         Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
>>>         AvroType<GenericData.Record> customAvroType = new
>>>                 AvroType<GenericData.Record>(
>>>                 GenericData.Record.class, specificRecordSchema,
>>>                 NoOpDeepCopier.<GenericData.Record>create(),
>>>                 Avros.specifics(LiveTrackingLine.class));
>>>         PCollection<GenericData.Record> pcollection =
>>>                 pipeline.read(From.avroFile(inputPath, customAvroType));
>>>
>>>         pipeline.write(pcollection, new TextFileTarget(outputPath));
>>>         return pipeline.done().succeeded() ? 0 : 1;
>>>
>>> when running this I get the following job error:
>>>
>>> org.apache.avro.AvroTypeException: Found
>>> org.apache.avro.mapreduce.KeyValuePair, expecting
>>> com.bol.hadoop.enrich.record.LiveTrackingLine
>>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
>>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>> at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)
>>>
>>> (I'm using crunch-core 0.10.0-hadoop2)
>>>
>>> Do you have any idea whats going wrong here?
>>>
>>> 2014-07-04 15:31 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
>>>> Hi Barrie,
>>>>
>>>> Inlined below
>>>>
>>>> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <kersbergenb@gmail.com>
wrote:
>>>>> Hi,
>>>>>
>>>>> I’m stuck trying to read an avro KeyValuePair datafile with crunch.
>>>>>
>>>>> This is a header dump of my avro data file:
>>>>>
>>>>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
>>>>> key/value pair","fields":[{"name":"key","type":"string","doc":"The
>>>>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record",
>>>>> etc etc
>>>>>
>>>>> I’m only interested in the LiveTrackingLine object but I probably need
>>>>> to read the whole KeyValuePair object and extract the LiveTrackingLine
>>>>> in the crunch pipeline.
>>>>>
>>>>> This is the code I have so far.
>>>>>
>>>>> String inputPath = args[0];
>>>>> String outputPath = args[1];
>>>>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>>>>> ExtractViewsJob.class.getSimpleName(), getConf());
>>>>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>>
lines =
>>>>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String,
>>>>> LiveTrackingLine>>>(inputPath));
>>>>>
>>>>> I’m a bit lost in the last part where I configure the 'pipeline'
>>>>> object with the right avro schema(s) and input dir.
>>>>
>>>> The easiest basic case here if you just want to read the data is to do
>>>> it as follows:
>>>>
>>>>   PCollection<GenericData.Record> lines =
>>>> pipeline.read(From.avroFile(inputPath));
>>>>
>>>> However, as you can see, this will give you a PCollection of
>>>> GenericData.Record objects.
>>>>
>>>>> Because my schema is very complex I
>>>>> want to parse this as a ‘specific’ and not as a ‘generic’ or
>>>>> ‘reflective’ avro representation, this is also a learning experience
>>>>> in using avro with crunch.
>>>>
>>>> This is not really currently officially supported in Crunch (although
>>>> there is a way to work around it). Because your top-level record type
>>>> is a generic record, you need to kind of skip around some of the basic
>>>> convenience methods of Crunch to get your specific records read as
>>>> actual
>>>> specific records.
>>>>
>>>> The workaround you can use it to do something like this to read in
>>>> your PCollection:
>>>>
>>>>   Schema specificRecordSchema = ...;
>>>>   AvroType<GenericData.Record> customAvroType = new
>>>> AvroType<GenericData.Record>(
>>>>     GenericData.Record.class, specificRecordSchema,
>>>>     NoOpDeepCopier.<GenericData.Record>create(),
>>>>     Avros.specifics(MySpecificRecord.class));
>>>>
>>>>   PCollection<GenericData.Record> pcollection =
>>>> pipeline.read(From.avroFile(inputPath, customAvroType));
>>>>
>>>> This will give you a PCollection of the GenericRecords that contain
>>>> instances of your specific class. However, be warned that this is
>>>> making use of what could be considered non-public APIs that could
>>>> change in the future.
>>>>
>>>> Is this (reading Avro files that have been serialized using
>>>> AvroKeyValue) a case that you will be using regularly? It might be
>>>> interesting to add this as built-in functionality in Crunch so you
>>>> don't need to mess around with creating your own AvroType.
>>>>
>>>> - Gabriel

Mime
View raw message