avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: Avromultiple output
Date Tue, 24 Feb 2015 17:36:40 GMT
Check your Hadoop version. In older version JobContext was interface and in
new one its class.

On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <davidginzburg@gmail.com>
wrote:

> Thank you for the answer.
>
> Tried but the exception
> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class
> was expected*
> persists
>
> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <artemervits@gmail.com>
> wrote:
>
>> try this
>>
>> Job job = Job.getInstance(conf);
>> Job.setName(name);
>>
>> Artem Ervits
>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <davidginzburg@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>
>>>
>>>
>>>
>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>> expected
>>>
>>> I read it is related to incompatible hadoop versions, So I modified
>>>
>>> When running with AMI with hadoop 103 I get the following exception:
>>>
>>> java.lang.NullPointerException
>>>     at
>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>     at
>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>     at
>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>     at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>
>>>
>>> The driver code is
>>>
>>> Job job = new Job(getConf(), "myad");
>>>
>>>
>>>
>>>         job.setOutputValueClass(NullWritable.class);
>>>
>>>
>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>         Path inputOflineFiles = new Path(args[0]);
>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>
>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>
>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>         job.setMapOutputKeyClass(Text.class);
>>>         job.setMapOutputValueClass(UberRecord.class);
>>>
>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>         AvroJob.setOutputKeySchema(job,
>>> Schema.create(Schema.Type.STRING));
>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>
>>>
>>>         job.setReducerClass(myAdReducer.class);
>>>         job.setOutputKeyClass(Text.class);
>>>         job.setOutputValueClass(UberRecord.class);
>>>         job.setNumReduceTasks(2);
>>>         String baseoutputFolder = args[2];
>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>                 baseoutputFolder);
>>>         ;
>>>
>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>
>>>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>
>>>
>>> the mapper and reducers
>>> @Override
>>>     public void setup(Context ctx) {
>>>
>>>         ubp = new UberRecordProcessor();
>>>     }
>>>
>>>     @Override
>>>     protected void map(LongWritable key, Text value, Context context)
>>>             throws IOException, InterruptedException {
>>>         try {
>>>             handleLineinMap(value);
>>>             if(ub!=null){
>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>> ub);
>>>                 context.getCounter("myAd",
>>> "myAdTextLineMapper").increment(1);
>>>             }else{
>>>                 context.getCounter("myAd",
>>> "myAdTextLineMapperNull").increment(1);
>>>             }
>>>         } catch (Exception e) {
>>>             context.getCounter("myAd",
>>> "myAdTextLineMapperError").increment(1);
>>>             logger.warn("could not parse line "+value.toString(),e);
>>>
>>>
>>>         }
>>>     }
>>>
>>> public class myAdReducer extends
>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>> AvroValue<UberRecord>> {
>>>
>>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>     private String baseOutputPath;
>>>     private long reduceAttemptUniqueIdentifier =
>>> System.currentTimeMillis();
>>>
>>>     // 2015-02-01T18:00:25.673Z
>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>
>>>     @Override
>>>     protected void setup(Context context) throws IOException,
>>>             InterruptedException {
>>>
>>>         amos = new AvroMultipleOutputs(context);
>>>         baseOutputPath =
>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>
>>>     }
>>>
>>>     @Override
>>>     protected void reduce(Text key, Iterable<UberRecord> values, Context
>>> context)
>>>             throws IOException, InterruptedException {
>>>
>>>         try {
>>>             UberRecord ub = new UberRecord();
>>>             for (UberRecord ubi : values) {
>>>                 // enrich
>>>                 if (ubi.getExchange() == null) {
>>>                     continue;
>>>                 }
>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>                         .getEnricher(ubi.getExchange().toString());
>>>                 enc.enrich(ubi);
>>>                 ub = mergeUB(ub, ubi);
>>>             }
>>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>             String partition = getPartition(ub);
>>>
>>>             // context.write();
>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>                             + partition + "/p" +
>>> reduceAttemptUniqueIdentifier);
>>>         } catch (Exception e) {
>>>             // TODO Auto-generated catch block
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>         List<Field> engFields = EngageData.getClassSchema().getFields();
>>>         for (Field field : fields) {
>>>             if (field.name().equals("engageData")
>>>                     && dest.getEngageData() != null) {
>>>                 EngageData mergedEng = dest.getEngageData();
>>>                 for (Field engField : engFields) {
>>>                     if (dest.getEngageData().get(engField.name()) ==
>>> null) {
>>>                         mergedEng.put(engField.name(),
>>>
>>> src.getEngageData().get(engField.name()));
>>>                     }
>>>
>>>                 }
>>>                 dest.setEngageData(mergedEng);
>>>             } else {
>>>                 if (dest.get(field.name()) == null) {
>>>                     dest.put(field.name(), src.get(field.name()));
>>>                 }
>>>             }
>>>         }
>>>         return dest;
>>>     }
>>>
>>>
>>>
>>>
>


-- 
Deepak

Mime
View raw message