hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r780621 - in /hadoop/core/trunk: CHANGES.txt src/examples/org/apache/hadoop/examples/Sort.java src/test/mapred/org/apache/hadoop/mapred/SortValidator.java src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Date Mon, 01 Jun 2009 12:01:29 GMT
Author: sharad
Date: Mon Jun  1 12:01:28 2009
New Revision: 780621

URL: http://svn.apache.org/viewvc?rev=780621&view=rev
Log:
HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new mapreduce api. Contributed
by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
    hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun  1 12:01:28 2009
@@ -411,6 +411,9 @@
     HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to 
     use new mapreduce api. (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new 
+    mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java Mon Jun  1 12:01:28
2009
@@ -29,11 +29,15 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.InputSampler;
-import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -42,7 +46,7 @@
  * other than use the framework to fragment and sort the input values.
  *
  * To run: bin/hadoop jar build/hadoop-examples.jar sort
- *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-r <i>reduces</i>]
  *            [-inFormat <i>input format class</i>] 
  *            [-outFormat <i>output format class</i>] 
  *            [-outKey <i>output key class</i>] 
@@ -51,10 +55,10 @@
  *            <i>in-dir</i> <i>out-dir</i> 
  */
 public class Sort<K,V> extends Configured implements Tool {
-  private RunningJob jobResult = null;
+  private Job job = null;
 
   static int printUsage() {
-    System.out.println("sort [-m <maps>] [-r <reduces>] " +
+    System.out.println("sort [-r <reduces>] " +
                        "[-inFormat <input format class>] " +
                        "[-outFormat <output format class>] " + 
                        "[-outKey <output key class>] " +
@@ -62,7 +66,7 @@
                        "[-totalOrder <pcnt> <num samples> <max splits>]
" +
                        "<input> <output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
 
   /**
@@ -73,16 +77,11 @@
    */
   public int run(String[] args) throws Exception {
 
-    JobConf jobConf = new JobConf(getConf(), Sort.class);
-    jobConf.setJobName("sorter");
-
-    jobConf.setMapperClass(IdentityMapper.class);        
-    jobConf.setReducerClass(IdentityReducer.class);
-
-    JobClient client = new JobClient(jobConf);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
+    String sort_reduces = conf.get("test.sort.reduces_per_host");
     if (sort_reduces != null) {
        num_reduces = cluster.getTaskTrackers() * 
                        Integer.parseInt(sort_reduces);
@@ -97,9 +96,7 @@
     InputSampler.Sampler<K,V> sampler = null;
     for(int i=0; i < args.length; ++i) {
       try {
-        if ("-m".equals(args[i])) {
-          jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
-        } else if ("-r".equals(args[i])) {
+        if ("-r".equals(args[i])) {
           num_reduces = Integer.parseInt(args[++i]);
         } else if ("-inFormat".equals(args[i])) {
           inputFormatClass = 
@@ -132,15 +129,21 @@
         return printUsage(); // exits
       }
     }
-
     // Set user-supplied (possibly default) job configs
-    jobConf.setNumReduceTasks(num_reduces);
+    job = new Job(conf);
+    job.setJobName("sorter");
+    job.setJarByClass(Sort.class);
 
-    jobConf.setInputFormat(inputFormatClass);
-    jobConf.setOutputFormat(outputFormatClass);
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
 
-    jobConf.setOutputKeyClass(outputKeyClass);
-    jobConf.setOutputValueClass(outputValueClass);
+    job.setNumReduceTasks(num_reduces);
+
+    job.setInputFormatClass(inputFormatClass);
+    job.setOutputFormatClass(outputFormatClass);
+
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
 
     // Make sure there are exactly 2 parameters left.
     if (otherArgs.size() != 2) {
@@ -148,37 +151,37 @@
           otherArgs.size() + " instead of 2.");
       return printUsage();
     }
-    FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
-    FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
-
+    FileInputFormat.setInputPaths(job, otherArgs.get(0));
+    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
+    
     if (sampler != null) {
       System.out.println("Sampling input to effect total-order sort...");
-      jobConf.setPartitionerClass(TotalOrderPartitioner.class);
-      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
-      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
+      job.setPartitionerClass(TotalOrderPartitioner.class);
+      Path inputDir = FileInputFormat.getInputPaths(job)[0];
+      inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
       Path partitionFile = new Path(inputDir, "_sortPartitioning");
-      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
-      InputSampler.<K,V>writePartitionFile(jobConf, sampler);
+      TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
+      InputSampler.<K,V>writePartitionFile(job, sampler);
       URI partitionUri = new URI(partitionFile.toString() +
                                  "#" + "_sortPartitioning");
-      DistributedCache.addCacheFile(partitionUri, jobConf);
-      DistributedCache.createSymlink(jobConf);
+      DistributedCache.addCacheFile(partitionUri, conf);
+      DistributedCache.createSymlink(conf);
     }
 
     System.out.println("Running on " +
         cluster.getTaskTrackers() +
         " nodes to sort from " + 
-        FileInputFormat.getInputPaths(jobConf)[0] + " into " +
-        FileOutputFormat.getOutputPath(jobConf) +
+        FileInputFormat.getInputPaths(job)[0] + " into " +
+        FileOutputFormat.getOutputPath(job) +
         " with " + num_reduces + " reduces.");
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    jobResult = JobClient.runJob(jobConf);
+    int ret = job.waitForCompletion(true) ? 0 : 1;
     Date end_time = new Date();
     System.out.println("Job ended: " + end_time);
     System.out.println("The job took " + 
         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-    return 0;
+    return ret;
   }
 
 
@@ -192,7 +195,7 @@
    * Get the last job that was run using this instance.
    * @return the results of the last job that was run
    */
-  public RunningJob getResult() {
-    return jobResult;
+  public Job getResult() {
+    return job;
   }
 }

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java Mon Jun
 1 12:01:28 2009
@@ -210,9 +210,9 @@
           try {
             URI inputURI = new URI(job.get("map.input.file"));
             String inputFile = inputURI.getPath();
-            partition = Integer.valueOf(
-                                        inputFile.substring(inputFile.lastIndexOf("part")+5)
-                                        ).intValue();
+            // part file is of the form part-r-xxxxx
+            partition = Integer.valueOf(inputFile.substring(
+              inputFile.lastIndexOf("part") + 7)).intValue();
             noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
           } catch (Exception e) {
             System.err.println("Caught: " + e);

Modified: hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Mon
Jun  1 12:01:28 2009
@@ -96,7 +96,7 @@
     // Run Sort
     Sort sort = new Sort();
     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
-    Counters counters = sort.getResult().getCounters();
+    org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters();
     long mapInput = counters.findCounter(
       org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP,
       org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ).
@@ -106,7 +106,7 @@
     // the hdfs read should be between 100% and 110% of the map input bytes
     assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
                (hdfsRead < (mapInput * 1.1)) &&
-               (hdfsRead > mapInput));  
+               (hdfsRead >= mapInput));  
   }
   
   private static void runSortValidator(JobConf job, 



Mime
View raw message