hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Clements, Michael" <Michael.Cleme...@disney.com>
Subject RE: Question about setting the number of mappers.
Date Tue, 19 Jan 2010 16:14:01 GMT
Do you want to change the total # of mappers, or the # that run at any given time? For example,
Hadoop may split your job into 1083 mapper tasks, only 10 of which it allows to run at a time.

 

The setting in mapred-site.xml caps how many mappers can run simultaneously per machine. It
does not affect how many total mappers will make up the job.

 

Total # of tasks in a job, and the # that can run simultaneously, are two separate settings.
Both are important for tuning performance. The InputFormat controls the first.

 

 

From: mapreduce-user-return-292-Michael.Clements=disney.com@hadoop.apache.org [mailto:mapreduce-user-return-292-Michael.Clements=disney.com@hadoop.apache.org]
On Behalf Of Jeff Zhang
Sent: Monday, January 18, 2010 4:54 PM
To: mapreduce-user@hadoop.apache.org
Subject: Re: Question about setting the number of mappers.

 


Hi Teryl

The number of mapper is determined by the InputFormat you use, in your case, one way is to
merge the files to large file beforehand, or use the CombineFileInputFormat as your InputFormat.





On Mon, Jan 18, 2010 at 1:05 PM, Teryl Taylor <teryl.taylor@gmail.com> wrote:

Hi everyone,

I'm playing around with the Hadoop map reduce library and I'm getting some odd behaviour.
 The system is setup on one machine using the pseudo distributed configuration.  I use KFS
as my DFS.   I have written a MapReduce program to process a bunch of binary files.  The files
are compressed in chunks, so I do not split the files.  There are 1083 files that I am loading
into the map reducer.  

Everytime I run the map reducer:

~/hadoop/bin/hadoop jar  /home/hadoop/lib/test.jar org.apache.hadoop.mapreduce.apps.Test /input/2007/*/*/
 /output

It always creates 1083 mapper tasks to do the processing..which is extremely slow....so I
wanted to try and lower the number to 10 and see how the performance is.  I set the following
in  mapred-site.xml:

<configuration>
 <property>
   <name>mapred.job.tracker</name>
   <value>localhost:9001</value>
 </property>
 <property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>10</value>
 </property>
 <property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>10</value>
 </property>
 <property>
   <name>mapred.map.tasks</name>
   <value>10</value>
 </property>
</configuration>

Have recycled the jobtracker and tasktracker and I still always get 1083 mappers.   The map
reducer is working as expected other than this.  I'm using the new API and my main function
in my class looks like:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
//    conf.setInt("mapred.map.tasks", 10);
    Job job = new Job(conf, "word count");
    conf.setNumMapTasks(10);
    job.setJarByClass(Test.class);
    job.setMapperClass(TestMapper.class);
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(CustomInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

I have been digging around in the hadoop source code and it looks like the JobClient actually
sets the mappers to the number of splits (hard coded)....snippet from the JobClient class:
****************************************************************************************************************************************************************************************************************************************
/**
   * Internal method for submitting jobs to the system.
   * @param job the configuration to submit
   * @return a proxy object for the running job
   * @throws FileNotFoundException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  public
  RunningJob submitJobInternal(JobConf job
                               ) throws FileNotFoundException,
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */

    JobID jobId = jobSubmitClient.getNewJobId();
    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
    Path submitJarFile = new Path(submitJobDir, "job.jar");
    Path submitSplitFile = new Path(submitJobDir, "job.split");
    configureCommandLineOptions(job, submitJobDir, submitJarFile);
    Path submitJobFile = new Path(submitJobDir, "job.xml");
    int reduces = job.getNumReduceTasks();
    JobContext context = new JobContext(job, jobId);

    // Check the output specification
    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
      output.checkOutputSpecs(context);
    } else {
      job.getOutputFormat().checkOutputSpecs(fs, job);
    }

    // Create the splits for the job
    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    int maps;
    if (job.getUseNewMapper()) {
      maps = writeNewSplits(context, submitSplitFile);
    } else {
      maps = writeOldSplits(job, submitSplitFile);
    }
    job.set("mapred.job.split.file", submitSplitFile.toString());
    job.setNumMapTasks(maps);

    // Write job file to JobTracker's fs        
    FSDataOutputStream out =
      FileSystem.create(fs, submitJobFile,
                        new FsPermission(JOB_FILE_PERMISSION));

    try {
      job.writeXml(out);
    } finally {
      out.close();
   ..... 
                                                                                         
                                                                       737,0-1       39%
}

***********************************************************************************************************************************************************************************************************************************************

Is there anything I can do to get the number of mappers to be more flexible?


Cheers,

Teryl




-- 
Best Regards

Jeff Zhang

Mime
View raw message