hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077059 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/MapTask.java test/org/apache/hadoop/mapred/TestJobCounters.java test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Date Fri, 04 Mar 2011 03:36:32 GMT
Author: omalley
Date: Fri Mar  4 03:36:32 2011
New Revision: 1077059

URL: http://svn.apache.org/viewvc?rev=1077059&view=rev
Log:
commit fd047ec451465d0fd8703c5902011de1f0aae8d8
Author: Chris Douglas <cdouglas@apache.org>
Date:   Wed Nov 25 14:36:56 2009 -0800

    MAPREDUCE:1147 from https://issues.apache.org/jira/secure/attachment/12424714/mapred-1147-v1.4-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1147. Add map output counters to new API. (Amar Kamat via
    +    cdouglas)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCounters.java
Removed:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSpilledRecordsCounter.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077059&r1=1077058&r2=1077059&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
Fri Mar  4 03:36:32 2011
@@ -472,6 +472,43 @@ class MapTask extends Task {
     }
   }
 
+  private class NewDirectOutputCollector<K,V>
+  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+    private final org.apache.hadoop.mapreduce.RecordWriter out;
+
+    private final TaskReporter reporter;
+
+    private final Counters.Counter mapOutputRecordCounter;
+    
+    @SuppressWarnings("unchecked")
+    NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
+    throws IOException, ClassNotFoundException, InterruptedException {
+      this.reporter = reporter;
+      out = outputFormat.getRecordWriter(taskContext);
+      mapOutputRecordCounter = 
+        reporter.getCounter(MAP_OUTPUT_RECORDS);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(K key, V value) 
+    throws IOException, InterruptedException {
+      reporter.progress();
+      out.write(key, value);
+      mapOutputRecordCounter.increment(1);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException,InterruptedException {
+      reporter.progress();
+      if (out != null) {
+        out.close(context);
+      }
+    }
+  }
+  
   private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final MapOutputCollector<K,V> collector;
@@ -570,7 +607,8 @@ class MapTask extends Task {
 
       // get an output object
       if (job.getNumReduceTasks() == 0) {
-        output = outputFormat.getRecordWriter(taskContext);
+         output =
+           new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
       } else {
         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
       }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCounters.java?rev=1077059&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCounters.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobCounters.java
Fri Mar  4 03:36:32 2011
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import junit.framework.TestCase;
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
+import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * This is an wordcount application that tests job counters.
+ * It generates simple text input files. Then
+ * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
+ * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
+ * and 1 reduce) and verifies counters. Wordcount application reads the
+ * text input files, breaks each line into words and counts them. The output
+ * is a locally sorted list of words and the count of how often they occurred.
+ *
+ */
+public class TestJobCounters extends TestCase {
+
+  String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                          File.separator + "tmp")).toString().replace(' ', '+');
+ 
+  private void validateMapredCounters(Counters counter, long spillRecCnt, 
+                                long mapInputRecords, long mapOutputRecords) {
+    // Check if the numer of Spilled Records is same as expected
+    assertEquals(spillRecCnt,
+      counter.findCounter(SPILLED_RECORDS).getCounter());
+    assertEquals(mapInputRecords,
+      counter.findCounter(MAP_INPUT_RECORDS).getCounter());
+    assertEquals(mapOutputRecords, 
+      counter.findCounter(MAP_OUTPUT_RECORDS).getCounter());
+  }
+
+  private void validateCounters(org.apache.hadoop.mapreduce.Counters counter, 
+                                long spillRecCnt, 
+                                long mapInputRecords, long mapOutputRecords) {
+    // Check if the numer of Spilled Records is same as expected
+    assertEquals(spillRecCnt,
+      counter.findCounter(SPILLED_RECORDS).getValue());
+    assertEquals(mapInputRecords,
+      counter.findCounter(MAP_INPUT_RECORDS).getValue());
+    assertEquals(mapOutputRecords, 
+      counter.findCounter(MAP_OUTPUT_RECORDS).getValue());
+  }
+  
+  private void createWordsFile(File inpFile) throws Exception {
+    Writer out = new BufferedWriter(new FileWriter(inpFile));
+    try {
+      // 500*4 unique words --- repeated 5 times => 5*2K words
+      int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
+
+      for (int i = 0; i < REPLICAS; i++) {
+        for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
+          out.write("word" + j + " word" + (j+1) + " word" + (j+2) 
+                    + " word" + (j+3) + '\n');
+        }
+      }
+    } finally {
+      out.close();
+    }
+  }
+
+
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the
+   *                     job tracker.
+   */
+  public void testOldJobWithMapAndReducers() throws Exception {
+    JobConf conf = new JobConf(TestJobCounters.class);
+    conf.setJobName("wordcount-map-reducers");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(WordCount.MapClass.class);
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+
+    conf.setNumMapTasks(3);
+    conf.setNumReduceTasks(1);
+    conf.setInt("io.sort.mb", 1);
+    conf.setInt("io.sort.factor", 2);
+    conf.set("io.sort.record.percent", "0.05");
+    conf.set("io.sort.spill.percent", "0.80");
+
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = new Path(TEST_ROOT_DIR, "countertest");
+    conf.set("test.build.data", testDir.toString());
+    try {
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+      if (!fs.mkdirs(testDir)) {
+        throw new IOException("Mkdirs failed to create " + testDir.toString());
+      }
+
+      String inDir = testDir +  File.separator + "genins" + File.separator;
+      String outDir = testDir + File.separator;
+      Path wordsIns = new Path(inDir);
+      if (!fs.mkdirs(wordsIns)) {
+        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+      }
+
+      //create 3 input files each with 5*2k words
+      File inpFile = new File(inDir + "input5_2k_1");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_2");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_3");
+      createWordsFile(inpFile);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      Path outputPath1 = new Path(outDir, "output5_2k_3");
+      FileOutputFormat.setOutputPath(conf, outputPath1);
+
+      RunningJob myJob = JobClient.runJob(conf);
+      Counters c1 = myJob.getCounters();
+      // 3maps & in each map, 4 first level spills --- So total 12.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 3*18=54k
+      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 6k+4k=10k
+      // Total job counter will be 54k+10k = 64k
+      
+      //3 maps and 2.5k lines --- So total 7.5k map input records
+      //3 maps and 10k words in each --- So total of 30k map output recs
+      validateMapredCounters(c1, 64000, 7500, 30000);
+
+      //create 4th input file each with 5*2k words and test with 4 maps
+      inpFile = new File(inDir + "input5_2k_4");
+      createWordsFile(inpFile);
+      conf.setNumMapTasks(4);
+      Path outputPath2 = new Path(outDir, "output5_2k_4");
+      FileOutputFormat.setOutputPath(conf, outputPath2);
+
+      myJob = JobClient.runJob(conf);
+      c1 = myJob.getCounters();
+      // 4maps & in each map 4 first level spills --- So total 16.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 4*18=72k
+      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 8k+8k=16k
+      // Total job counter will be 72k+16k = 88k
+      
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateMapredCounters(c1, 88000, 10000, 40000);
+      
+      // check for a map only job
+      conf.setNumReduceTasks(0);
+      Path outputPath3 = new Path(outDir, "output5_2k_5");
+      FileOutputFormat.setOutputPath(conf, outputPath3);
+
+      myJob = JobClient.runJob(conf);
+      c1 = myJob.getCounters();
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateMapredCounters(c1, 0, 10000, 40000);
+    } finally {
+      //clean up the input and output files
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+    }
+  }
+  
+  public static class NewMapTokenizer 
+  extends Mapper<Object, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(Object key, Text value, Context context) 
+ throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+  
+  public static class NewIdentityReducer  
+  extends Reducer<Text, IntWritable, Text, IntWritable> {
+  private IntWritable result = new IntWritable();
+  
+  public void reduce(Text key, Iterable<IntWritable> values, 
+                     Context context) throws IOException, InterruptedException {
+    int sum = 0;
+    for (IntWritable val : values) {
+      sum += val.get();
+    }
+    result.set(sum);
+    context.write(key, result);
+  }
+ }
+  
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the
+   *                     job tracker.
+   */
+  public void testNewJobWithMapAndReducers() throws Exception {
+    JobConf conf = new JobConf(TestJobCounters.class);
+    conf.setInt("io.sort.mb", 1);
+    conf.setInt("io.sort.factor", 2);
+    conf.set("io.sort.record.percent", "0.05");
+    conf.set("io.sort.spill.percent", "0.80");
+
+    FileSystem fs = FileSystem.get(conf);
+    Path testDir = new Path(TEST_ROOT_DIR, "countertest2");
+    conf.set("test.build.data", testDir.toString());
+    try {
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+      if (!fs.mkdirs(testDir)) {
+        throw new IOException("Mkdirs failed to create " + testDir.toString());
+      }
+
+      String inDir = testDir +  File.separator + "genins" + File.separator;
+      Path wordsIns = new Path(inDir);
+      if (!fs.mkdirs(wordsIns)) {
+        throw new IOException("Mkdirs failed to create " + wordsIns.toString());
+      }
+      String outDir = testDir + File.separator;
+
+      //create 3 input files each with 5*2k words
+      File inpFile = new File(inDir + "input5_2k_1");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_2");
+      createWordsFile(inpFile);
+      inpFile = new File(inDir + "input5_2k_3");
+      createWordsFile(inpFile);
+
+      FileInputFormat.setInputPaths(conf, inDir);
+      Path outputPath1 = new Path(outDir, "output5_2k_3");
+      FileOutputFormat.setOutputPath(conf, outputPath1);
+      
+      Job job = new Job(conf);
+      job.setJobName("wordcount-map-reducers");
+
+      // the keys are words (strings)
+      job.setOutputKeyClass(Text.class);
+      // the values are counts (ints)
+      job.setOutputValueClass(IntWritable.class);
+
+      job.setMapperClass(NewMapTokenizer.class);
+      job.setCombinerClass(NewIdentityReducer.class);
+      job.setReducerClass(NewIdentityReducer.class);
+
+      job.setNumReduceTasks(1);
+
+      job.waitForCompletion(false);
+      
+      org.apache.hadoop.mapreduce.Counters c1 = job.getCounters();
+      // 3maps & in each map, 4 first level spills --- So total 12.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 3*18=54k
+      // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
+      //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 6k+4k=10k
+      // Total job counter will be 54k+10k = 64k
+      
+      //3 maps and 2.5k lines --- So total 7.5k map input records
+      //3 maps and 10k words in each --- So total of 30k map output recs
+      validateCounters(c1, 64000, 7500, 30000);
+
+      //create 4th input file each with 5*2k words and test with 4 maps
+      inpFile = new File(inDir + "input5_2k_4");
+      createWordsFile(inpFile);
+      JobConf newJobConf = new JobConf(job.getConfiguration());
+      
+      Path outputPath2 = new Path(outDir, "output5_2k_4");
+      
+      FileOutputFormat.setOutputPath(newJobConf, outputPath2);
+
+      Job newJob = new Job(newJobConf);
+      newJob.waitForCompletion(false);
+      c1 = newJob.getCounters();
+      // 4maps & in each map 4 first level spills --- So total 16.
+      // spilled records count:
+      // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
+      //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
+      //           So total 8k+8k+2k=18k
+      // For 3 Maps, total = 4*18=72k
+      // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
+      //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
+      //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
+      //         So 0 records spilled to disk in 3rd level)
+      //         So total of 8k+8k=16k
+      // Total job counter will be 72k+16k = 88k
+      
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(c1, 88000, 10000, 40000);
+      
+      JobConf newJobConf2 = new JobConf(newJob.getConfiguration());
+      
+      Path outputPath3 = new Path(outDir, "output5_2k_5");
+      
+      FileOutputFormat.setOutputPath(newJobConf2, outputPath3);
+
+      Job newJob2 = new Job(newJobConf2);
+      newJob2.setNumReduceTasks(0);
+      newJob2.waitForCompletion(false);
+      c1 = newJob2.getCounters();
+      // 4 maps and 2.5k words in each --- So 10k map input records
+      // 4 maps and 10k unique words --- So 40k map output records
+      validateCounters(c1, 0, 10000, 40000);
+    } finally {
+      //clean up the input and output files
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+    }
+  }
+}
\ No newline at end of file



Mime
View raw message