hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Zhang <zjf...@gmail.com>
Subject Re: Question about setting the number of mappers.
Date Tue, 19 Jan 2010 00:53:35 GMT
Hi Teryl

The number of mapper is determined by the InputFormat you use, in your case,
one way is to merge the files to large file beforehand, or use the
CombineFileInputFormat as your InputFormat.




On Mon, Jan 18, 2010 at 1:05 PM, Teryl Taylor <teryl.taylor@gmail.com>wrote:

> Hi everyone,
>
> I'm playing around with the Hadoop map reduce library and I'm getting some
> odd behaviour.  The system is setup on one machine using the pseudo
> distributed configuration.  I use KFS as my DFS.   I have written a
> MapReduce program to process a bunch of binary files.  The files are
> compressed in chunks, so I do not split the files.  There are 1083 files
> that I am loading into the map reducer.
>
> Everytime I run the map reducer:
>
> ~/hadoop/bin/hadoop jar  /home/hadoop/lib/test.jar
> org.apache.hadoop.mapreduce.apps.Test /input/2007/*/*/  /output
>
> It always creates 1083 mapper tasks to do the processing..which is
> extremely slow....so I wanted to try and lower the number to 10 and see how
> the performance is.  I set the following in  mapred-site.xml:
>
> <configuration>
>  <property>
>    <name>mapred.job.tracker</name>
>    <value>localhost:9001</value>
>  </property>
>  <property>
>   <name>mapred.tasktracker.map.tasks.maximum</name>
>   <value>10</value>
>  </property>
>  <property>
>   <name>mapred.tasktracker.reduce.tasks.maximum</name>
>   <value>10</value>
>  </property>
>  <property>
>    <name>mapred.map.tasks</name>
>    <value>10</value>
>  </property>
> </configuration>
>
> Have recycled the jobtracker and tasktracker and I still always get 1083
> mappers.   The map reducer is working as expected other than this.  I'm
> using the new API and my main function in my class looks like:
>
>  public static void main(String[] args) throws Exception {
>     Configuration conf = new Configuration();
>     String[] otherArgs = new GenericOptionsParser(conf,
> args).getRemainingArgs();
>     if (otherArgs.length != 2) {
>       System.err.println("Usage: wordcount <in> <out>");
>       System.exit(2);
>     }
> //    conf.setInt("mapred.map.tasks", 10);
>     Job job = new Job(conf, "word count");
>     conf.setNumMapTasks(10);
>     job.setJarByClass(Test.class);
>     job.setMapperClass(TestMapper.class);
>     job.setCombinerClass(IntSumReducer.class);
>     job.setReducerClass(IntSumReducer.class);
>     job.setOutputKeyClass(Text.class);
>     job.setOutputValueClass(IntWritable.class);
>     job.setInputFormatClass(CustomInputFormat.class);
>     job.setOutputFormatClass(TextOutputFormat.class);
>     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
>     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
>     System.exit(job.waitForCompletion(true) ? 0 : 1);
>   }
>
> I have been digging around in the hadoop source code and it looks like the
> JobClient actually sets the mappers to the number of splits (hard
> coded)....snippet from the JobClient class:
>
> ****************************************************************************************************************************************************************************************************************************************
> /**
>    * Internal method for submitting jobs to the system.
>    * @param job the configuration to submit
>    * @return a proxy object for the running job
>    * @throws FileNotFoundException
>    * @throws ClassNotFoundException
>    * @throws InterruptedException
>    * @throws IOException
>    */
>   public
>   RunningJob submitJobInternal(JobConf job
>                                ) throws FileNotFoundException,
>                                         ClassNotFoundException,
>                                         InterruptedException,
>                                         IOException {
>     /*
>      * configure the command line options correctly on the submitting dfs
>      */
>
>     JobID jobId = jobSubmitClient.getNewJobId();
>     Path submitJobDir = new Path(getSystemDir(), jobId.toString());
>     Path submitJarFile = new Path(submitJobDir, "job.jar");
>     Path submitSplitFile = new Path(submitJobDir, "job.split");
>     configureCommandLineOptions(job, submitJobDir, submitJarFile);
>     Path submitJobFile = new Path(submitJobDir, "job.xml");
>     int reduces = job.getNumReduceTasks();
>     JobContext context = new JobContext(job, jobId);
>
>     // Check the output specification
>     if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
>       org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
>         ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
>       output.checkOutputSpecs(context);
>     } else {
>       job.getOutputFormat().checkOutputSpecs(fs, job);
>     }
>
>     // Create the splits for the job
>     LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
>     int maps;
>     if (job.getUseNewMapper()) {
>       maps = writeNewSplits(context, submitSplitFile);
>     } else {
>       maps = writeOldSplits(job, submitSplitFile);
>     }
>     job.set("mapred.job.split.file", submitSplitFile.toString());
>     job.setNumMapTasks(maps);
>
>     // Write job file to JobTracker's fs
>     FSDataOutputStream out =
>       FileSystem.create(fs, submitJobFile,
>                         new FsPermission(JOB_FILE_PERMISSION));
>
>     try {
>       job.writeXml(out);
>     } finally {
>       out.close();
>    .....
>
> 737,0-1       39%
> }
>
>
> ***********************************************************************************************************************************************************************************************************************************************
>
> Is there anything I can do to get the number of mappers to be more
> flexible?
>
>
> Cheers,
>
> Teryl
>
>


-- 
Best Regards

Jeff Zhang

Mime
View raw message