hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Harsh J <ha...@cloudera.com>
Subject Re: reduce stop after n records
Date Fri, 09 Mar 2012 19:05:09 GMT
Henry,

Something like:

  @Override
  public void run(Context context) throws IOException, InterruptedException {
    // Do stuff here you'd do in setup(…) otherwise.
    // Now begin iterating.
    while (context.nextKey()) {
      // Run your reducing function here. Like the following maybe.
      reduce(context.getCurrentKey(), context.getValues(), context);
      // Since you are now in a regular loop. Break as necessary
whenever you want. Your logic.
    }
    // Do stuff here you'd otherwise do in cleanup(context);
  }

On Fri, Mar 9, 2012 at 11:32 PM, Henry Helgen <hhelgen@gmail.com> wrote:
> Thanks, the presentation is helpful. I am using the new API's context
> objects, but am not familiar with how to use the run method to accomplish
> this. Do you have some code ideas or examples?
>
> job code follows. Is this right?
> -------------------------------------------------------------------------------------------
>   public static void main(String[] args) throws Exception {
>     /**
>      * The main accepts the input and output directories as command line
>          * parameters. Then it defines the job classes. The main submits two
> jobs
>          * The first job filters out languages and sums the pagecounts. The
>      * second job sorts by pagecount descending then selects the top 50.
>      * @param key, value, context provided by Job.setReducerClass()
>      * @return none
>      * @throws IOException, InterruptedException
>      */
>
>     //configure the first job
>     Configuration conf1 = new Configuration();
>     String[] otherArgs1 = new GenericOptionsParser(conf1,
> args).getRemainingArgs();
>     if (otherArgs1.length != 2) {
>       System.err.println("Usage: programname <in> <out>");
>       System.exit(2);
>     }//end if args
>     Job job1 = new Job(conf1, "Wiki Counts part 1");
>     job1.setJarByClass(WikiCounts.class);
>     job1.setMapperClass(LineTokenMapper.class); //custom mapper job 1
>     job1.setCombinerClass(LongSumReducer.class);
>     job1.setReducerClass(LongSumReducer.class);
>
>     //Set reducers White p193
>     job1.setNumReduceTasks(32);
>     job1.setOutputKeyClass(Text.class);
>     job1.setOutputValueClass(LongWritable.class);
>
>     //Sequence File and Compression White p233.
>     job1.setOutputFormatClass(SequenceFileOutputFormat.class);
>     SequenceFileOutputFormat.setCompressOutput(job1, true);
>     SequenceFileOutputFormat.setOutputCompressorClass(job1,
> GzipCodec.class);
>     SequenceFileOutputFormat.setOutputCompressionType(job1,
> CompressionType.BLOCK);
>
>     FileInputFormat.addInputPath(job1, new Path(otherArgs1[0]));
>     FileOutputFormat.setOutputPath(job1, new Path("out1in2batch"));
>
>
>     //configure the second job
>     Configuration conf2 = new Configuration();
>     String[] otherArgs2 = new GenericOptionsParser(conf2,
> args).getRemainingArgs();
>     if (otherArgs2.length != 2) {
>       System.err.println("Usage: programname <in> <out>");
>       System.exit(2);
>     }//end if args
>     Job job2 = new Job(conf2, "Wiki Counts part 2");
>     job2.setJarByClass(WikiCounts.class);
>     job2.setInputFormatClass(SequenceFileInputFormat.class);
>     job2.setMapperClass(InverseMapper.class);
>     job2.setReducerClass(InverseTopReducer.class); //custom reducer job 2
>     job2.setSortComparatorClass(LongWritable.DecreasingComparator.class);
>     FileInputFormat.addInputPath(job2, new Path("out1in2batch"));
>     FileOutputFormat.setOutputPath(job2, new Path(otherArgs2[1]));
>
>     //run the jobs in order
>     job1.waitForCompletion(true);
>     job2.waitForCompletion(true);
>     System.exit(job2.waitForCompletion(true) ? 0 : 1);
>   }//end main
>
> }//end class
> ********************************************************************
>
> ----------------------------------------------------------------------
>
> Hello Henry,
>
> Per the older conversation, what Owen was pointing to were the new API
> Mapper/Reducer classes, and its run(�) method override specifically:
> http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/mapreduce/Reducer.html#run(org.apache.hadoop.mapreduce.Reducer.Context)
>
> You'll need to port your job to the new (still a bit unstable) API to
> leverage this. Here are some slides to aid you in that task:
> http://www.slideshare.net/sh1mmer/upgrading-to-the-new-map-reduce-api (The
> first part, from Owen).
>
>
>
> On Thu, Mar 8, 2012 at 5:02 PM, Henry Helgen <hhelgen@gmail.com> wrote:
>>
>> I am using hadoop 0.20.2 mapreduce API. The program is running fine, just
>> slower than it could.
>>
>> I sum values and then use
>> job.setSortComparatorClass(LongWritable.DecreasingComparator.class) to sort
>> descending by sum. I need to stop the reducer after outputting the first N
>> records. This would save the reducer from running over thousands of records
>> when it only needs the first few records. Is there a solution with the new
>> mapreduce 0.20.2 API?
>>
>> -------------------------------------------------------------------
>> I notice messages from 2008 about this topic:
>>
>> http://grokbase.com/t/hadoop/common-user/089420wvkx/stop-mr-jobs-after-n-records-have-been-produced
>>
>> https://issues.apache.org/jira/browse/HADOOP-3973
>>
>> The last statement follows,  but the link is broken.
>> "You could do this pretty easily by implementing a custom MapRunnable.
>> There is no equivalent for reduces. The interface proposed in
>> HADOOP-1230 would support that kind of application. See:
>>
>> http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/
>> Look at the new Mapper and Reducer interfaces."
>>
>



-- 
Harsh J

Mime
View raw message