avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Russell Jurney <russell.jur...@gmail.com>
Subject Re: AVRO threatening to ruin Christmas
Date Fri, 21 Dec 2012 18:56:23 GMT
Since the holidays are involved, I suggest you try Pig and the AvroStorage
UDF to load the data, and another UDF to store the data. What format are
you writing in?

Instructions for using avrostorage with Pig 0.10 are here:
http://hortonworks.com/blog/pig-as-connector-part-one-pig-mongodb-and-node-js/
 On Dec 21, 2012 9:56 AM, "Terry Healy" <thealy@bnl.gov> wrote:

> Going crazy here trying to reconcile this. Found links to some aspects ,
> partially implemented in the 'The Definitive Guide' AVRO weather M/R
> example, and outlined in Package org.apache.avro.mapred under "For jobs
> whose input is an Avro data file and which use an AvroMapper, but whose
> reducer is a non-Avro Reducer and whose output is a non-Avro format:".
> Clearly I have misunderstood something while attempting to follow those
> instructions.
>
> The test code does not include a mapper so the job setup is not like
> what I'm trying to achieve: AVRO format into Mapper, Text out of
> Reducer. (I've eliminated attempting to use the Partitioner, Comparator,
> and GroupingComparator used in the working M/R code that reads .tsv
> rather than AVRO.)
>
> The current stumbling block is "AvroFlowWritable cannot be cast to
> org.apache.avro.generic.IndexedRecord", Where AvroFlowWritable is my
> class. I think my existing reducer would work fine if I could use it
> with the AvroMapper it throws the above exception.
>
>
> From setup:
>
>         conf.setOutputFormat(TextOutputFormat.class);
>         conf.setOutputKeyClass(LongPair.class);
>         conf.setOutputValueClass(AvroFlowWritable.class);
>
>         NETFLOW_V5_SCHEMA = new Schema.Parser().parse(NetflowSchema);
>
>         AvroJob.setInputSchema(conf, NETFLOW_V5_SCHEMA);
>         AvroJob.setMapperClass(conf, AvroFlowMapper.class);
>         AvroJob.setReducerClass(conf, AvroFlowReducer.class);
>
>         Schema afwSchema =
> ReflectData.get().getSchema(AvroFlowWritable.class);
>         Schema pairSchema =
> Pair.getPairSchema(Schema.create(Schema.Type.LONG), afwSchema);
>         AvroJob.setMapOutputSchema(conf, pairSchema);
>
>
>     /*
>      * ------------------
>      * *** Mapper ***
>      * ------------------
>      */
>
> public static class AvroFlowMapper<K> extends AvroMapper<K, Pair<Long,
> AvroFlowWritable>> {
>
>         Long[] keepIps;
>
>         // Configure removed
>
>         @Override
>         public void map(K datum, AvroCollector<Pair<Long,
>                 AvroFlowWritable>> collector, Reporter reporter)
>                 throws IOException {
>
>
>             GenericRecord record = (GenericRecord) datum;
>             AvroFlowWritable afw = new AvroFlowWritable(record);
>
>             if (isKeeper(afw)) {
>
>                  Long testKey;
>                 if (inKeeperIpList(afw.getSrcIp())) {
>
>                     testKey = new Long(afw.getDstIp());
>
>                 } else {
>
>                     testKey = new Long(afw.getSrcIp());
>                 }
>                 collector.collect(new Pair<Long,
> AvroFlowWritable>(testKey, afw));
>             }
>         }
>     }
>
>  /*
>      * ------------------
>      * *** Reducer ***
>      * ------------------
>      */
>
>     public static class AvroFlowReducer extends AvroReducer<Long,
> AvroFlowWritable, Text> {
>
>
>         @Override
>         public void reduce(Long key, Iterable<AvroFlowWritable> values,
> AvroCollector<Text> collector, Reporter reporter) throws IOException {
>
>             Iterator iter = values.iterator();
>             while (iter.hasNext()) {
>
>                 AvroFlowWritable afw = (AvroFlowWritable) iter.next();
>                 //
>                 collector.collect(new Text(afw.toString()));
>             }
>         }
>    }
>
>
>
> On 12/20/2012 12:32 PM, Terry Healy wrote:
> > I'm just getting started using AVRO within Map/Reduce and trying to
> > convert some existing non-AVRO code to use AVRO input. So far the data
> > that previously was stored in tab delimited files has been converted to
> > .avro successfully as checked with avro-tools.
> >
> > Where I'm getting hung up extending beyond my book-based examples is in
> > attempting to read from AVRO (using generic records) where the mapper
> > output is NOT in AVRO format. I can't seem to reconcile extending
> > AvroMapper and NOT using AvroCollector.
> >
> > Here are snippets of code that show my non-AVRO M/R code and my
> > [failing] attempts to make this change. If anyone can help me along it
> > would be very much appreciated.
> >
> > -Terry
> >
> > <code>
> > Pre-Avro version: (Works fine with .tsv input format)
> >
> >     public static class HdFlowMapper extends MapReduceBase
> >             implements Mapper<Text, HdFlowWritable, LongPair,
> > HdFlowWritable> {
> >
> >
> >         @Override
> >         public void map(Text key, HdFlowWritable value,
> >                 OutputCollector<LongPair, HdFlowWritable> output,
> >                 Reporter reporter) throws IOException {
> >
> >               ...//
> >                 outKey = new LongPair(value.getSrcIp(),
> value.getFirst());
> >
> >                 HdFlowWritable outValue = value; // pass it all through
> >                 output.collect(outKey, outValue);
> >       }
> >
> >
>

Mime
View raw message