avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artem Ervits <artemerv...@gmail.com>
Subject Re: Avromultiple output
Date Tue, 24 Feb 2015 13:12:53 GMT
try this

Job job = Job.getInstance(conf);
Job.setName(name);

Artem Ervits
On Feb 21, 2015 10:57 PM, "David Ginzburg" <davidginzburg@gmail.com> wrote:

> Hi,
> I am trying to run an MR job on emr with AvromultipleOutput
>
>
>
>
> I get the following exception when running with AMI with hadoop 2.2 2.5
> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
> expected
>
> I read it is related to incompatible hadoop versions, So I modified
>
> When running with AMI with hadoop 103 I get the following exception:
>
> java.lang.NullPointerException
>     at
> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>     at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>     at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>
>
> The driver code is
>
> Job job = new Job(getConf(), "myad");
>
>
>
>         job.setOutputValueClass(NullWritable.class);
>
>
>         job.setJarByClass(myAdTextLineMapper.class);
>         Path inputOflineFiles = new Path(args[0]);
>         Path inputOfUbberFiles = new Path(args[1]);
>
>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>
>         job.setMapperClass(myAdTextLineMapper.class);
>         job.setMapOutputKeyClass(Text.class);
>         job.setMapOutputValueClass(UberRecord.class);
>
>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>         AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>
>
>         job.setReducerClass(myAdReducer.class);
>         job.setOutputKeyClass(Text.class);
>         job.setOutputValueClass(UberRecord.class);
>         job.setNumReduceTasks(2);
>         String baseoutputFolder = args[2];
>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>                 baseoutputFolder);
>         ;
>
> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>
>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>         return job.waitForCompletion(true) ? 0 : 1;
>
>
> the mapper and reducers
> @Override
>     public void setup(Context ctx) {
>
>         ubp = new UberRecordProcessor();
>     }
>
>     @Override
>     protected void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>         try {
>             handleLineinMap(value);
>             if(ub!=null){
>                 context.write(new Text(ub.getAuctionId().toString()), ub);
>                 context.getCounter("myAd",
> "myAdTextLineMapper").increment(1);
>             }else{
>                 context.getCounter("myAd",
> "myAdTextLineMapperNull").increment(1);
>             }
>         } catch (Exception e) {
>             context.getCounter("myAd",
> "myAdTextLineMapperError").increment(1);
>             logger.warn("could not parse line "+value.toString(),e);
>
>
>         }
>     }
>
> public class myAdReducer extends
>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
> AvroValue<UberRecord>> {
>
>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>     UberRecordProcessor ubp = new UberRecordProcessor();
>     // "year=%s/month=%s/day=%s/hour=%s"
>     private String baseOutputPath;
>     private long reduceAttemptUniqueIdentifier =
> System.currentTimeMillis();
>
>     // 2015-02-01T18:00:25.673Z
>     static DateTimeFormatter dateformat = DateTimeFormat
>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>
>     @Override
>     protected void setup(Context context) throws IOException,
>             InterruptedException {
>
>         amos = new AvroMultipleOutputs(context);
>         baseOutputPath =
> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>
>     }
>
>     @Override
>     protected void reduce(Text key, Iterable<UberRecord> values, Context
> context)
>             throws IOException, InterruptedException {
>
>         try {
>             UberRecord ub = new UberRecord();
>             for (UberRecord ubi : values) {
>                 // enrich
>                 if (ubi.getExchange() == null) {
>                     continue;
>                 }
>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>                         .getEnricher(ubi.getExchange().toString());
>                 enc.enrich(ubi);
>                 ub = mergeUB(ub, ubi);
>             }
>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>             String partition = getPartition(ub);
>
>             // context.write();
>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>             amos.write(new AvroKey<CharSequence>(key.toString()),
>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>                             + partition + "/p" +
> reduceAttemptUniqueIdentifier);
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
>
>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>         List<Field> fields = UberRecord.getClassSchema().getFields();
>         List<Field> engFields = EngageData.getClassSchema().getFields();
>         for (Field field : fields) {
>             if (field.name().equals("engageData")
>                     && dest.getEngageData() != null) {
>                 EngageData mergedEng = dest.getEngageData();
>                 for (Field engField : engFields) {
>                     if (dest.getEngageData().get(engField.name()) == null)
> {
>                         mergedEng.put(engField.name(),
>                                 src.getEngageData().get(engField.name()));
>                     }
>
>                 }
>                 dest.setEngageData(mergedEng);
>             } else {
>                 if (dest.get(field.name()) == null) {
>                     dest.put(field.name(), src.get(field.name()));
>                 }
>             }
>         }
>         return dest;
>     }
>
>
>
>

Mime
View raw message