crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From B Kersbergen <kersberg...@gmail.com>
Subject Re: read an org.apache.avro.mapreduce.KeyValuePair datafile
Date Fri, 04 Jul 2014 14:30:57 GMT
Hi Gabriel,

Yes all our avro data is wrapped in the Avro KeyValuePair because
map-reduce jobs like to reason on keys and values.

This is now my code:
String inputPath = args[0];
        String outputPath = args[1];

        Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
ExtractViewsJob.class.getSimpleName(), getConf());

        Schema specificRecordSchema = LiveTrackingLine.SCHEMA$;
        AvroType<GenericData.Record> customAvroType = new
                AvroType<GenericData.Record>(
                GenericData.Record.class, specificRecordSchema,
                NoOpDeepCopier.<GenericData.Record>create(),
                Avros.specifics(LiveTrackingLine.class));
        PCollection<GenericData.Record> pcollection =
                pipeline.read(From.avroFile(inputPath, customAvroType));

        pipeline.write(pcollection, new TextFileTarget(outputPath));
        return pipeline.done().succeeded() ? 0 : 1;

when running this I get the following job error:

org.apache.avro.AvroTypeException: Found
org.apache.avro.mapreduce.KeyValuePair, expecting
com.bol.hadoop.enrich.record.LiveTrackingLine
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:127)

(I'm using crunch-core 0.10.0-hadoop2)

Do you have any idea whats going wrong here?

2014-07-04 15:31 GMT+02:00 Gabriel Reid <gabriel.reid@gmail.com>:
> Hi Barrie,
>
> Inlined below
>
> On Fri, Jul 4, 2014 at 2:06 PM, B Kersbergen <kersbergenb@gmail.com> wrote:
>> Hi,
>>
>> I’m stuck trying to read an avro KeyValuePair datafile with crunch.
>>
>> This is a header dump of my avro data file:
>>
>> {"type":"record","name":"KeyValuePair","namespace":"org.apache.avro.mapreduce","doc":"A
>> key/value pair","fields":[{"name":"key","type":"string","doc":"The
>> key"},{"name":"value","type":{"type":"record","name":"LiveTrackingLine","namespace":"com.bol.hadoop.enrich.record",
>> etc etc
>>
>> I’m only interested in the LiveTrackingLine object but I probably need
>> to read the whole KeyValuePair object and extract the LiveTrackingLine
>> in the crunch pipeline.
>>
>> This is the code I have so far.
>>
>> String inputPath = args[0];
>> String outputPath = args[1];
>> Pipeline pipeline = new MRPipeline(ExtractViewsJob.class,
>> ExtractViewsJob.class.getSimpleName(), getConf());
>> PCollection<AvroWrapper <Pair<String, LiveTrackingLine>>> lines
=
>> pipeline.read(new AvroFileSource<AvroWrapper<Pair<String,
>> LiveTrackingLine>>>(inputPath));
>>
>> I’m a bit lost in the last part where I configure the 'pipeline'
>> object with the right avro schema(s) and input dir.
>
> The easiest basic case here if you just want to read the data is to do
> it as follows:
>
>   PCollection<GenericData.Record> lines =
> pipeline.read(From.avroFile(inputPath));
>
> However, as you can see, this will give you a PCollection of
> GenericData.Record objects.
>
>> Because my schema is very complex I
>> want to parse this as a ‘specific’ and not as a ‘generic’ or
>> ‘reflective’ avro representation, this is also a learning experience
>> in using avro with crunch.
>
> This is not really currently officially supported in Crunch (although
> there is a way to work around it). Because your top-level record type
> is a generic record, you need to kind of skip around some of the basic
> convenience methods of Crunch to get your specific records read as
> actual
> specific records.
>
> The workaround you can use it to do something like this to read in
> your PCollection:
>
>   Schema specificRecordSchema = ...;
>   AvroType<GenericData.Record> customAvroType = new
> AvroType<GenericData.Record>(
>     GenericData.Record.class, specificRecordSchema,
>     NoOpDeepCopier.<GenericData.Record>create(),
>     Avros.specifics(MySpecificRecord.class));
>
>   PCollection<GenericData.Record> pcollection =
> pipeline.read(From.avroFile(inputPath, customAvroType));
>
> This will give you a PCollection of the GenericRecords that contain
> instances of your specific class. However, be warned that this is
> making use of what could be considered non-public APIs that could
> change in the future.
>
> Is this (reading Avro files that have been serialized using
> AvroKeyValue) a case that you will be using regularly? It might be
> interesting to add this as built-in functionality in Crunch so you
> don't need to mess around with creating your own AvroType.
>
> - Gabriel

Mime
View raw message