hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Teryl Taylor <teryl.tay...@gmail.com>
Subject Question about setting the number of mappers.
Date Mon, 18 Jan 2010 21:05:29 GMT
Hi everyone,

I'm playing around with the Hadoop map reduce library and I'm getting some
odd behaviour.  The system is setup on one machine using the pseudo
distributed configuration.  I use KFS as my DFS.   I have written a
MapReduce program to process a bunch of binary files.  The files are
compressed in chunks, so I do not split the files.  There are 1083 files
that I am loading into the map reducer.

Everytime I run the map reducer:

~/hadoop/bin/hadoop jar  /home/hadoop/lib/test.jar
org.apache.hadoop.mapreduce.apps.Test /input/2007/*/*/  /output

It always creates 1083 mapper tasks to do the processing..which is extremely
slow....so I wanted to try and lower the number to 10 and see how the
performance is.  I set the following in  mapred-site.xml:

<configuration>
 <property>
   <name>mapred.job.tracker</name>
   <value>localhost:9001</value>
 </property>
 <property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>10</value>
 </property>
 <property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>10</value>
 </property>
 <property>
   <name>mapred.map.tasks</name>
   <value>10</value>
 </property>
</configuration>

Have recycled the jobtracker and tasktracker and I still always get 1083
mappers.   The map reducer is working as expected other than this.  I'm
using the new API and my main function in my class looks like:

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
//    conf.setInt("mapred.map.tasks", 10);
    Job job = new Job(conf, "word count");
    conf.setNumMapTasks(10);
    job.setJarByClass(Test.class);
    job.setMapperClass(TestMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(CustomInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

I have been digging around in the hadoop source code and it looks like the
JobClient actually sets the mappers to the number of splits (hard
coded)....snippet from the JobClient class:
****************************************************************************************************************************************************************************************************************************************
/**
   * Internal method for submitting jobs to the system.
   * @param job the configuration to submit
   * @return a proxy object for the running job
   * @throws FileNotFoundException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  public
  RunningJob submitJobInternal(JobConf job
                               ) throws FileNotFoundException,
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */

    JobID jobId = jobSubmitClient.getNewJobId();
    Path submitJobDir = new Path(getSystemDir(), jobId.toString());
    Path submitJarFile = new Path(submitJobDir, "job.jar");
    Path submitSplitFile = new Path(submitJobDir, "job.split");
    configureCommandLineOptions(job, submitJobDir, submitJarFile);
    Path submitJobFile = new Path(submitJobDir, "job.xml");
    int reduces = job.getNumReduceTasks();
    JobContext context = new JobContext(job, jobId);

    // Check the output specification
    if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
      org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
        ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
      output.checkOutputSpecs(context);
    } else {
      job.getOutputFormat().checkOutputSpecs(fs, job);
    }

    // Create the splits for the job
    LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
    int maps;
    if (job.getUseNewMapper()) {
      maps = writeNewSplits(context, submitSplitFile);
    } else {
      maps = writeOldSplits(job, submitSplitFile);
    }
    job.set("mapred.job.split.file", submitSplitFile.toString());
    job.setNumMapTasks(maps);

    // Write job file to JobTracker's fs
    FSDataOutputStream out =
      FileSystem.create(fs, submitJobFile,
                        new FsPermission(JOB_FILE_PERMISSION));

    try {
      job.writeXml(out);
    } finally {
      out.close();
   .....

737,0-1       39%
}

***********************************************************************************************************************************************************************************************************************************************

Is there anything I can do to get the number of mappers to be more flexible?


Cheers,

Teryl

Mime
View raw message