avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
Subject Re: Avromultiple output
Date Tue, 24 Feb 2015 17:38:23 GMT
Share your avro dependencies(versions) in case your using maven and hadoop
dependencies (version)



On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepujain@gmail.com> wrote:

> Check your Hadoop version. In older version JobContext was interface and
> in new one its class.
>
> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <davidginzburg@gmail.com>
> wrote:
>
>> Thank you for the answer.
>>
>> Tried but the exception
>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>> class was expected*
>> persists
>>
>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <artemervits@gmail.com>
>> wrote:
>>
>>> try this
>>>
>>> Job job = Job.getInstance(conf);
>>> Job.setName(name);
>>>
>>> Artem Ervits
>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <davidginzburg@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>
>>>>
>>>>
>>>>
>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>> expected
>>>>
>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>
>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>
>>>> java.lang.NullPointerException
>>>>     at
>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>     at
>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>     at
>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>     at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>
>>>>
>>>> The driver code is
>>>>
>>>> Job job = new Job(getConf(), "myad");
>>>>
>>>>
>>>>
>>>>         job.setOutputValueClass(NullWritable.class);
>>>>
>>>>
>>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>>         Path inputOflineFiles = new Path(args[0]);
>>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>>
>>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>
>>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>>         job.setMapOutputKeyClass(Text.class);
>>>>         job.setMapOutputValueClass(UberRecord.class);
>>>>
>>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>         AvroJob.setOutputKeySchema(job,
>>>> Schema.create(Schema.Type.STRING));
>>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>
>>>>
>>>>         job.setReducerClass(myAdReducer.class);
>>>>         job.setOutputKeyClass(Text.class);
>>>>         job.setOutputValueClass(UberRecord.class);
>>>>         job.setNumReduceTasks(2);
>>>>         String baseoutputFolder = args[2];
>>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>                 baseoutputFolder);
>>>>         ;
>>>>
>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>
>>>>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>>
>>>>
>>>> the mapper and reducers
>>>> @Override
>>>>     public void setup(Context ctx) {
>>>>
>>>>         ubp = new UberRecordProcessor();
>>>>     }
>>>>
>>>>     @Override
>>>>     protected void map(LongWritable key, Text value, Context context)
>>>>             throws IOException, InterruptedException {
>>>>         try {
>>>>             handleLineinMap(value);
>>>>             if(ub!=null){
>>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>>> ub);
>>>>                 context.getCounter("myAd",
>>>> "myAdTextLineMapper").increment(1);
>>>>             }else{
>>>>                 context.getCounter("myAd",
>>>> "myAdTextLineMapperNull").increment(1);
>>>>             }
>>>>         } catch (Exception e) {
>>>>             context.getCounter("myAd",
>>>> "myAdTextLineMapperError").increment(1);
>>>>             logger.warn("could not parse line "+value.toString(),e);
>>>>
>>>>
>>>>         }
>>>>     }
>>>>
>>>> public class myAdReducer extends
>>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>> AvroValue<UberRecord>> {
>>>>
>>>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>>     public static final String BASE_OUTPUT_FOLDER =
>>>> "base.output.folder";
>>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>>     private String baseOutputPath;
>>>>     private long reduceAttemptUniqueIdentifier =
>>>> System.currentTimeMillis();
>>>>
>>>>     // 2015-02-01T18:00:25.673Z
>>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>
>>>>     @Override
>>>>     protected void setup(Context context) throws IOException,
>>>>             InterruptedException {
>>>>
>>>>         amos = new AvroMultipleOutputs(context);
>>>>         baseOutputPath =
>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     protected void reduce(Text key, Iterable<UberRecord> values,
>>>> Context context)
>>>>             throws IOException, InterruptedException {
>>>>
>>>>         try {
>>>>             UberRecord ub = new UberRecord();
>>>>             for (UberRecord ubi : values) {
>>>>                 // enrich
>>>>                 if (ubi.getExchange() == null) {
>>>>                     continue;
>>>>                 }
>>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>                         .getEnricher(ubi.getExchange().toString());
>>>>                 enc.enrich(ubi);
>>>>                 ub = mergeUB(ub, ubi);
>>>>             }
>>>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>>             String partition = getPartition(ub);
>>>>
>>>>             // context.write();
>>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>                     new AvroValue<UberRecord>(ub), baseOutputPath +
"/"
>>>>                             + partition + "/p" +
>>>> reduceAttemptUniqueIdentifier);
>>>>         } catch (Exception e) {
>>>>             // TODO Auto-generated catch block
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>         List<Field> engFields = EngageData.getClassSchema().getFields();
>>>>         for (Field field : fields) {
>>>>             if (field.name().equals("engageData")
>>>>                     && dest.getEngageData() != null) {
>>>>                 EngageData mergedEng = dest.getEngageData();
>>>>                 for (Field engField : engFields) {
>>>>                     if (dest.getEngageData().get(engField.name()) ==
>>>> null) {
>>>>                         mergedEng.put(engField.name(),
>>>>
>>>> src.getEngageData().get(engField.name()));
>>>>                     }
>>>>
>>>>                 }
>>>>                 dest.setEngageData(mergedEng);
>>>>             } else {
>>>>                 if (dest.get(field.name()) == null) {
>>>>                     dest.put(field.name(), src.get(field.name()));
>>>>                 }
>>>>             }
>>>>         }
>>>>         return dest;
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Mime
View raw message