avro-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsh J <ha...@cloudera.com>
Subject Re: How to provide the multiple avro inputs to Two avroMappers and same mapper outputs to single avroReducer.
Date Sun, 24 Mar 2013 18:06:49 GMT
One would ideally use the MultipleInputs Hadoop MR class to achieve
this but it lacks "avro support", same as the rest of the Hadoop MR
classes and there's nothing in Avro MR libs that complement it yet.

>From past, I recall basing own libraries/applications on the same
prefixes MultipleInputs uses in Configuration, for providing schemas,
etc. to each input
path's tagged mapper but this was hackery done to achieve some end. We
should ideally add an AvroMultipleInputs class to Avro's MR libs to
cover this gap. Do you mind opening a JIRA to discuss the
implementation further?

On Thu, Mar 21, 2013 at 4:28 PM, rajharireddy <rajharireddy@yahoo.com> wrote:
> How to provide the multiple avro inputs to Two avroMappers and same mapper
> outputs to single avroReducer.
>
> written code for single input to one avroMappers and same mapper outputs to
> single avroReducer.
>
>
>
>
> import java.io.File;
> import java.io.IOException;
> import org.apache.avro.Schema;
> import org.apache.avro.file.DataFileReader;
> import org.apache.avro.file.FileReader;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumReader;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.generic.GenericData.Record;
> import org.apache.avro.mapred.AvroCollector;
> import org.apache.avro.mapred.AvroJob;
> import org.apache.avro.mapred.AvroMapper;
> import org.apache.avro.mapred.AvroReducer;
> import org.apache.avro.mapred.FsInput;
> import org.apache.avro.mapred.Pair;
> import org.apache.avro.util.Utf8;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.fs.FSDataInputStream;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IOUtils;
> import org.apache.hadoop.mapred.FileInputFormat;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
>
> public class Test extends Configured implements Tool {
>
>         static class TargetMapper extends AvroMapper<GenericRecord, Pair<Utf8,
> GenericRecord>> {
>             public void map(GenericRecord line, AvroCollector<Pair<Utf8,
> GenericRecord>> collector, Reporter reporter) throws IOException {
>                 try{
>                         collector.collect(new Pair<Utf8, GenericRecord>("data",
line));
>                 }
>                 catch (Exception e) {
>                                 System.out.println("Error message "+ e.getMessage());
>                         }
>             }
>         }
>
>         static class TargetReducer extends AvroReducer<Utf8, GenericRecord,
> GenericRecord> {
>
>                 public void reduce(Utf8 key, Iterable<GenericRecord> values,
> AvroCollector<GenericRecord> collector, Reporter reporter) throws
> IOException
>             {
>                         try{
>                         Configuration conf = getConf();
>                         Path inputDir = new Path("/user/cts336339/avro");
>                             Schema output_schema = readOutputSchema(inputDir, conf);
>                             //System.out.println("output_schema " + output_schema);
>
>                         GenericRecord output = new GenericData.Record(output_schema);
//
> Record for output
>                         System.out.println("inside for loop" + output);
>                         GenericRecord location = new
> GenericData.Record(output_schema.getField("Location").schema());        //Record
> for Location Field
>                         GenericRecord data = new
> GenericData.Record(output_schema.getField("Data").schema());            //Record for
> Data Field
>
>                         for (GenericRecord value : values){
>                                 //System.out.println("inside for loop" + value);
>                                 location.put("Location_key",value.get("locid"));
>                                 location.put("Location_value","0");
>
>                                 data.put("Data_key","0");
>                                 data.put("Data_value",value.get("rec"));
>
>                                 output.put("Location",location);
>                                 output.put("Data",data);
>                         }
>                         collector.collect(output);
>                         }catch(Exception e){
>                                 System.out.println("Error message "+ e.getMessage());
>                         }
>             }
>           }
>
>           @Override
>           public int run(String[] args) throws Exception {
>
>             if (args.length != 3) {
>               System.err.printf(
>                 "Usage: %s [generic options] <input> <output> <schema-file>\n",
>                 getClass().getSimpleName());
>               ToolRunner.printGenericCommandUsage(System.err);
>               return -1;
>             }
>             String input = args[0];
>             String output = args[1];
>             String schemaFile = args[2];
>             //System.out.println("schema1" + schemaFile);
>
>             JobConf conf = new JobConf(getConf(), getClass());
>             conf.setJobName("Avro");
>
>             FileInputFormat.addInputPath(conf, new Path(input));
>             FileOutputFormat.setOutputPath(conf, new Path(output));
>
>             Path inputDir = new Path("/user/cts336339/avro");
>             Schema schema = readInputSchema(inputDir, conf);
>
>             Schema schema1 = new Schema.Parser().parse(new File(schemaFile));
>             //System.out.println("schema1" + schema1);
>
>             AvroJob.setInputSchema(conf, schema);
>             AvroJob.setMapOutputSchema(conf,
> Pair.getPairSchema(schema.create(Schema.Type.STRING), schema));
>             AvroJob.setOutputSchema(conf, schema1);
>
>             AvroJob.setMapperClass(conf, TargetMapper.class);
>             AvroJob.setReducerClass(conf, TargetReducer.class);
>
>             JobClient.runJob(conf);
>             return 0;
>           }
>
>           private Schema readInputSchema(Path inputDir, Configuration conf) throws
> IOException {
>                     FsInput fsInput = null;
>                     FileReader reader = null;
>                     try {
>                               fsInput = new FsInput(new Path(inputDir, "Users2.avro"),
conf);
>                               reader = DataFileReader.openReader(fsInput, new
> GenericDatumReader());
>                               return reader.getSchema();
>                     } finally {
>                               IOUtils.closeStream(fsInput);
>                               IOUtils.closeStream(reader);
>                     }
>           }
>           private static Schema readOutputSchema(Path inputDir, Configuration conf)
> throws IOException {
>                     FsInput fsInput = null;
>                     FileReader reader = null;
>                     try {
>                               fsInput = new FsInput(new Path(inputDir, "AvroWithNull.avro"),
> conf);
>                               reader = DataFileReader.openReader(fsInput, new
> GenericDatumReader());
>                       return reader.getSchema();
>                     } finally {
>                               IOUtils.closeStream(fsInput);
>                               IOUtils.closeStream(reader);
>                     }
>           }
>           public static void main(String[] args) throws Exception {
>             int exitCode = ToolRunner.run(new Test(), args);
>             System.exit(exitCode);
>           }
> }
>
>
>
>
> --
> View this message in context: http://apache-avro.679487.n3.nabble.com/How-to-provide-the-multiple-avro-inputs-to-Two-avroMappers-and-same-mapper-outputs-to-single-avroRed-tp4026644.html
> Sent from the Avro - Developers mailing list archive at Nabble.com.



--
Harsh J

Mime
View raw message