hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r798846 - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testjar/
Date Wed, 29 Jul 2009 10:33:41 GMT
Author: ddas
Date: Wed Jul 29 10:33:39 2009
New Revision: 798846

URL: http://svn.apache.org/viewvc?rev=798846&view=rev
Log:
MAPREDUCE-793. Creates a new test that consolidates a few tests to include in the commit-test
list. Contributed by Jothi Padmanabhan.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
    hadoop/mapreduce/trunk/src/test/mapred/testjar/ExternalIdentityReducer.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=798846&r1=798845&r2=798846&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 29 10:33:39 2009
@@ -153,6 +153,9 @@
     MAPREDUCE-628. Improves the execution time of TestJobInProgress.
     (Jothi Padmanabhan via ddas)
 
+    MAPREDUCE-793. Creates a new test that consolidates a few tests to
+    include in the commit-test list. (Jothi Padmanabhan via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java?rev=798846&r1=798845&r2=798846&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobClient.java Wed
Jul 29 10:33:39 2009
@@ -71,7 +71,7 @@
     return JobClient.runJob(conf).getID().toString();
   }
   
-  private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws
Exception {
+  static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws
Exception {
     PrintStream oldOut = System.out;
     PrintStream newOut = new PrintStream(out, true);
     try {
@@ -95,14 +95,14 @@
 
   public void testJobList() throws Exception {
     String jobId = runJob();
-    verifyJobPriority(jobId, "HIGH");
+    verifyJobPriority(jobId, "HIGH", createJobConf());
   }
 
-  private void verifyJobPriority(String jobId, String priority)
+  static void verifyJobPriority(String jobId, String priority, JobConf conf)
                             throws Exception {
     PipedInputStream pis = new PipedInputStream();
     PipedOutputStream pos = new PipedOutputStream(pis);
-    int exitCode = runTool(createJobConf(), new JobClient(),
+    int exitCode = runTool(conf, new JobClient(),
         new String[] { "-list", "all" },
         pos);
     assertEquals("Exit code", 0, exitCode);
@@ -125,6 +125,6 @@
         new String[] { "-set-priority", jobId, "VERY_LOW" },
         new ByteArrayOutputStream());
     assertEquals("Exit code", 0, exitCode);
-    verifyJobPriority(jobId, "VERY_LOW");
+    verifyJobPriority(jobId, "VERY_LOW", createJobConf());
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java?rev=798846&r1=798845&r2=798846&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobDirCleanup.java
Wed Jul 29 10:33:39 2009
@@ -37,7 +37,7 @@
   //end of the job (indirectly testing whether all tasktrackers
   //got a KillJobAction).
   private static final Log LOG =
-        LogFactory.getLog(TestEmptyJob.class.getName());
+    LogFactory.getLog(TestEmptyJob.class.getName());
   private void runSleepJob(JobConf conf) throws Exception {
     String[] args = { "-m", "1", "-r", "10", "-mt", "1000", "-rt", "10000" };
     ToolRunner.run(conf, new SleepJob(), args);
@@ -49,7 +49,6 @@
     FileSystem fileSys = null;
     try {
       final int taskTrackers = 10;
-      final int jobTrackerPort = 60050;
       Configuration conf = new Configuration();
       JobConf mrConf = new JobConf();
       mrConf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
@@ -61,26 +60,38 @@
       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
       JobConf jobConf = mr.createJobConf();
       runSleepJob(jobConf);
-      for(int i=0; i < taskTrackers; ++i) {
-        String jobDirStr = mr.getTaskTrackerLocalDir(i)+
-                           "/taskTracker/jobcache";
-        File jobDir = new File(jobDirStr);
-        String[] contents = jobDir.list();
-        while (contents.length > 0) {
-          try {
-            Thread.sleep(1000);
-            LOG.warn(jobDir +" not empty yet");
-            contents = jobDir.list();
-          } catch (InterruptedException ie){}
-        }
-      }
+      verifyJobDirCleanup(mr, taskTrackers);
     } catch (Exception ee){
+      assertTrue(false);
     } finally {
       if (fileSys != null) { fileSys.close(); }
       if (dfs != null) { dfs.shutdown(); }
       if (mr != null) { mr.shutdown(); }
     }
   }
+
+  static void verifyJobDirCleanup(MiniMRCluster mr, int numTT)
+  throws IOException {
+    for(int i=0; i < numTT; ++i) {
+      String jobDirStr = mr.getTaskTrackerLocalDir(i)+
+      "/taskTracker/jobcache";
+      File jobDir = new File(jobDirStr);
+      String[] contents = jobDir.list();
+      if (contents == null || contents.length == 0) {
+        return;
+      }
+      while (contents.length > 0) {
+        try {
+          Thread.sleep(1000);
+          LOG.warn(jobDir +" not empty yet, contents are");
+          for (String s: contents) {
+            LOG.info(s);
+          }
+          contents = jobDir.list();
+        } catch (InterruptedException ie){}
+      }
+    }
+  }
 }
 
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=798846&r1=798845&r2=798846&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Wed
Jul 29 10:33:39 2009
@@ -905,7 +905,7 @@
    * @param id job id
    * @param conf job conf
    */
-  private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
+  static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
           throws IOException  {
     // Get the history file name
     Path doneDir = JobHistory.getCompletedJobHistoryLocation();

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=798846&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Wed Jul
29 10:33:39 2009
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.TestJobInProgressListener.MyListener;
+import org.apache.hadoop.mapred.UtilsForTests.FailMapper;
+import org.apache.hadoop.mapred.UtilsForTests.KillMapper;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
+/** 
+ * This is a test case that tests several miscellaneous functionality. 
+ * This is intended for a fast test and encompasses the following:
+ * TestJobName
+ * TestJobClient
+ * TestJobDirCleanup
+ * TestJobKillAndFail
+ * TestUserDefinedCounters
+ * TestJobInProgressListener
+ * TestJobHistory
+ * TestMiniMRClassPath
+ * TestMiniMRWithDFSWithDistinctUsers
+ */
+
+@SuppressWarnings("deprecation")
+public class TestSeveral extends TestCase {
+
+  static final UnixUserGroupInformation DFS_UGI = 
+    TestMiniMRWithDFSWithDistinctUsers.createUGI("dfs", true); 
+  static final UnixUserGroupInformation TEST1_UGI = 
+    TestMiniMRWithDFSWithDistinctUsers.createUGI("pi", false); 
+  static final UnixUserGroupInformation TEST2_UGI = 
+    TestMiniMRWithDFSWithDistinctUsers.createUGI("wc", false);
+
+  private static MiniMRCluster mrCluster = null;
+  private static MiniDFSCluster dfs = null;
+  private static FileSystem fs = null;
+  private static MyListener myListener = null;
+
+  private int numReduces = 5;
+  private static final int numTT = 5;
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestSeveral.class)) {
+      protected void setUp() throws Exception {
+
+        Configuration conf = new Configuration();
+        conf.setInt("dfs.replication", 1);
+        UnixUserGroupInformation.saveToConf(conf,
+            UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI);
+        dfs = new MiniDFSCluster(conf, numTT, true, null);
+        fs = dfs.getFileSystem();
+
+        TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
+        TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+
+        UnixUserGroupInformation MR_UGI = 
+          TestMiniMRWithDFSWithDistinctUsers.createUGI(
+              UnixUserGroupInformation.login().getUserName(), false); 
+
+        // Create a TestJobInProgressListener.MyListener and associate
+        // it with the MiniMRCluster
+
+        myListener = new MyListener();
+        conf.set("mapred.job.tracker.handler.count", "1");
+        mrCluster =   new MiniMRCluster(0, 0,
+            numTT, dfs.getFileSystem().getUri().toString(), 
+            1, null, null, MR_UGI, new JobConf());
+        mrCluster.getJobTrackerRunner().getJobTracker()
+        .addJobInProgressListener(myListener);
+      }
+      
+      protected void tearDown() throws Exception {
+        if (fs != null) { fs.close(); }
+        if (dfs != null) { dfs.shutdown(); }
+        if (mrCluster != null) { mrCluster.shutdown(); }
+      }
+    };
+    return setup;
+  }
+
+  /** 
+   * Utility class to create input for the jobs
+   * @param inDir
+   * @param conf
+   * @throws IOException
+   */
+  private void makeInput(Path inDir, JobConf conf) throws IOException {
+    FileSystem inFs = inDir.getFileSystem(conf);
+
+    if (inFs.exists(inDir)) {
+      inFs.delete(inDir, true);
+    }
+    inFs.mkdirs(inDir);
+    Path inFile = new Path(inDir, "part-0");
+    DataOutputStream file = inFs.create(inFile);
+    for (int i = 0; i < numReduces; i++) {
+      file.writeBytes("b a\n");
+    }
+    file.close();
+  }
+
+  /**
+   * Clean the Output directories before running a Job
+   * @param fs
+   * @param outDir
+   */
+  private void clean(FileSystem fs, Path outDir) {
+    try {
+      fs.delete(outDir, true);
+    } catch (Exception e) {}
+  }
+
+  private void verifyOutput(FileSystem fs, Path outDir) throws IOException {
+    Path[] outputFiles = FileUtil.stat2Paths(
+        fs.listStatus(outDir, new OutputLogFilter()));
+    assertEquals(numReduces, outputFiles.length);
+    InputStream is = fs.open(outputFiles[0]);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+    String s = reader.readLine().split("\t")[1];
+    assertEquals("b a",s);
+    assertNull(reader.readLine());
+    reader.close();
+  }
+
+
+  @SuppressWarnings("unchecked")
+  static class DoNothingReducer extends MapReduceBase implements 
+  Reducer<WritableComparable, Writable, WritableComparable, Writable> {
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+        OutputCollector<WritableComparable, Writable> output,
+        Reporter reporter)
+    throws IOException { // Do nothing
+    }
+  }
+
+  /**
+   * Submit a job with a complex name (TestJobName.testComplexName)
+   * Check the status of the job as successful (TestJobKillAndFail)
+   * Check that the task tracker directory is cleaned up (TestJobDirCleanup)
+   * Create some user defined counters and check them (TestUserDefinedCounters)
+   * Job uses a reducer from an External Jar (TestMiniMRClassPath)
+   * Check task directories (TestMiniMRWithDFS)
+   * Check if the listener notifications are received(TestJobInProgressListener)
+   * Verify if priority changes to the job are reflected (TestJobClient)
+   * Validate JobHistory file format, content, userlog location (TestJobHistory)
+   * 
+   * @throws Exception
+   */
+  public void testSuccessfulJob() throws Exception {
+    JobConf conf = mrCluster.createJobConf();
+
+    // Set a complex Job name (TestJobName)
+    conf.setJobName("[name][some other value that gets" +
+    " truncated internally that this test attempts to aggravate]");
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setCompressMapOutput(true);
+
+    // Set the Mapper class to a Counting Mapper that defines user
+    // defined counters
+    conf.setMapperClass(TestUserDefinedCounters.CountingMapper.class);
+
+    conf.set("mapred.reducer.class", "testjar.ExternalIdentityReducer");
+
+    conf.setLong("mapred.min.split.size", 1024*1024);
+
+    conf.setNumReduceTasks(numReduces);
+    conf.setJobPriority(JobPriority.HIGH);
+    conf.setJar("build/test/mapred/testjar/testjob.jar");
+
+    String pattern = 
+      TaskAttemptID.getTaskAttemptIDsPattern(null, null, TaskType.MAP, 1, null);
+    conf.setKeepTaskFilesPattern(pattern);
+
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, TEST1_UGI);
+
+    final Path inDir = new Path("./test/input");
+    final Path outDir = new Path("./test/output");
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    clean(fs, outDir);
+    makeInput(inDir, conf);
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+    JobID jobId = job.getID();
+
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    // Check for JobInProgress Listener notification
+    assertFalse("Missing event notification for a running job", 
+        myListener.contains(jobId, true));
+
+    job.waitForCompletion();
+
+    assertTrue(job.isComplete());
+    assertEquals(JobStatus.SUCCEEDED,job.getJobState());
+
+    // check if the job success was notified
+    assertFalse("Missing event notification for a successful job", 
+        myListener.contains(jobId, false));
+
+    // Check Task directories
+    TaskAttemptID taskid = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 1),0);
+    TestMiniMRWithDFS.checkTaskDirectories(
+        mrCluster, new String[]{jobId.toString()}, 
+        new String[]{taskid.toString()});
+
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    int exitCode = TestJobClient.runTool(conf, new JobClient(),
+        new String[] { "-counter", jobId.toString(),
+      "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
+      out);
+    assertEquals(0, exitCode);
+    assertEquals(numReduces, Integer.parseInt(out.toString().trim()));
+
+    // Verify if user defined counters have been updated properly
+    TestUserDefinedCounters.verifyCounters(job, numTT);
+
+    // Verify job priority change (TestJobClient)
+    TestJobClient.verifyJobPriority(jobId.toString(), "HIGH", conf);
+    
+    // Basic check if the job did run fine
+    verifyOutput(outDir.getFileSystem(conf), outDir);
+
+    //TestJobHistory
+    TestJobHistory.validateJobHistoryFileFormat(jobId, conf, "SUCCESS", false);
+    TestJobHistory.validateJobHistoryFileContent(mrCluster, job, conf);
+    TestJobHistory.validateJobHistoryUserLogLocation(job.getID(), conf);
+
+    // Since we keep setKeepTaskFilesPattern, these files should still be
+    // present and will not be cleaned up.
+    for(int i=0; i < numTT; ++i) {
+      String jobDirStr = mrCluster.getTaskTrackerLocalDir(i)+
+      "/taskTracker/jobcache";
+      boolean b = FileSystem.getLocal(conf).delete(new Path(jobDirStr), true);
+      assertTrue(b);
+    }
+  }
+
+  /**
+   * Submit a job with BackSlashed name (TestJobName) that will fail
+   * Test JobHistory User Location to none (TetsJobHistory)
+   * Verify directory up for the Failed Job (TestJobDirCleanup)
+   * Verify Event is generated for the failed job (TestJobInProgressListener)
+   * 
+   * @throws Exception
+   */
+  public void testFailedJob() throws Exception {
+    JobConf conf = mrCluster.createJobConf();
+
+    // Name with regex
+    conf.setJobName("name \\Evalue]");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMapperClass(FailMapper.class);
+    conf.setOutputFormat(NullOutputFormat.class);
+    conf.setJobPriority(JobPriority.HIGH);
+
+    conf.setLong("mapred.map.max.attempts", 1);
+
+    conf.set("hadoop.job.history.user.location", "none");
+
+    conf.setNumReduceTasks(0);
+
+    final Path inDir = new Path("./wc/input");
+    final Path outDir = new Path("./wc/output");
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    clean(fs, outDir);
+    makeInput(inDir, conf);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+    JobID jobId = job.getID();
+    job.waitForCompletion();
+
+    assertTrue(job.isComplete());
+    assertEquals(JobStatus.FAILED, job.getJobState());
+
+    // check if the job failure was notified
+    assertFalse("Missing event notification on failing a running job", 
+        myListener.contains(jobId));
+
+    TestJobDirCleanup.verifyJobDirCleanup(mrCluster, numTT);
+  }
+
+  /**
+   * Submit a job that will get Killed with a Regex Name (TestJobName)
+   * Verify Job Directory Cleanup (TestJobDirCleanup)
+   * Verify Even is generated for Killed Job (TestJobInProgressListener)
+   * 
+   * @throws Exception
+   */
+  public void testKilledJob() throws Exception {
+    JobConf conf = mrCluster.createJobConf();
+
+    // Name with regex
+    conf.setJobName("name * abc + Evalue]");
+
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+    conf.setMapperClass(KillMapper.class);
+    conf.setOutputFormat(NullOutputFormat.class);
+    conf.setNumReduceTasks(0);
+
+    conf.setLong("mapred.map.max.attempts", 2);
+
+    final Path inDir = new Path("./wc/input");
+    final Path outDir = new Path("./wc/output");
+    final Path histDir = new Path("./wc/history");
+
+    conf.set("hadoop.job.history.user.location", histDir.toString());
+
+    conf.setNumReduceTasks(numReduces);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    clean(fs, outDir);
+    makeInput(inDir, conf);
+
+    JobClient jobClient = new JobClient(conf);
+    RunningJob job = jobClient.submitJob(conf);
+
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    job.killJob();
+
+    job.waitForCompletion();
+
+    assertTrue(job.isComplete());
+    assertEquals(JobStatus.KILLED, job.getJobState());
+
+    // check if the job failure was notified
+    assertFalse("Missing event notification on killing a running job", 
+        myListener.contains(job.getID()));
+
+    TestJobDirCleanup.verifyJobDirCleanup(mrCluster, numTT);
+  }
+
+}
+
+

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java?rev=798846&r1=798845&r2=798846&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestUserDefinedCounters.java
Wed Jul 29 10:33:39 2009
@@ -115,10 +115,14 @@
       reader.close();
       assertEquals(4, counter);
     }
-    
-    assertEquals(4,
+    verifyCounters(runningJob, 4);
+  }
+  
+  public static void verifyCounters(RunningJob runningJob, int expected)
+  throws IOException {
+    assertEquals(expected,
         runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
-    assertEquals(4,
+    assertEquals(expected,
         runningJob.getCounters().getGroup("StringCounter")
         .getCounter("MapRecords"));
   }

Added: hadoop/mapreduce/trunk/src/test/mapred/testjar/ExternalIdentityReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/testjar/ExternalIdentityReducer.java?rev=798846&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/testjar/ExternalIdentityReducer.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/testjar/ExternalIdentityReducer.java Wed Jul 29
10:33:39 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package testjar;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class ExternalIdentityReducer implements 
+             Reducer<WritableComparable, Writable,
+                     WritableComparable, Writable> {
+
+  public void configure(JobConf job) {
+
+  }
+
+  public void close()
+    throws IOException {
+  }
+
+  public void reduce(WritableComparable key, Iterator<Writable> values,
+                     OutputCollector<WritableComparable, Writable> output,
+                     Reporter reporter)
+    throws IOException {
+    
+    while (values.hasNext()) {
+      output.collect(key, values.next());
+    }
+  }
+}



Mime
View raw message