hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r798833 - in /hadoop/mapreduce/trunk: CHANGES.txt src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
Date Wed, 29 Jul 2009 10:07:12 GMT
Author: ddas
Date: Wed Jul 29 10:07:12 2009
New Revision: 798833

URL: http://svn.apache.org/viewvc?rev=798833&view=rev
Log:
MAPREDUCE-628. Improves the execution time of TestJobInProgress. Contributed by Jothi Padmanabhan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=798833&r1=798832&r2=798833&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 29 10:07:12 2009
@@ -150,6 +150,9 @@
     MAPREDUCE-373. Change org.apache.hadoop.mapred.lib.FieldSelectionMapReduce
     to use new api. (Amareshwari Sriramadasu via sharad)
 
+    MAPREDUCE-628. Improves the execution time of TestJobInProgress.
+    (Jothi Padmanabhan via ddas)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=798833&r1=798832&r2=798833&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Wed Jul 29 10:07:12 2009
@@ -24,6 +24,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapreduce.TaskType;
 
@@ -66,33 +67,57 @@
 
   static class FakeJobInProgress extends JobInProgress {
     JobClient.RawSplit[] rawSplits;
+    @SuppressWarnings("deprecation")
     FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
       super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
-      //initObjects(tracker, numMaps, numReduces);
+      Path jobFile = new Path("Dummy");
+      this.profile = new JobProfile(jobConf.getUser(), getJobID(), 
+          jobFile.toString(), null, jobConf.getJobName(),
+          jobConf.getQueueName());
     }
 
     @Override
     public synchronized void initTasks() throws IOException {
-      maps = new TaskInProgress[numMapTasks];
+     
+      JobClient.RawSplit[] splits = createSplits();
+      numMapTasks = splits.length;
+      createMapTasks(null, splits);
+      nonRunningMapCache = createCache(splits, maxLevel);
+      createReduceTasks(null);
+      tasksInited.set(true);
+      this.status.setRunState(JobStatus.RUNNING);
+    }
+    
+    @Override
+    JobClient.RawSplit[] createSplits(){
       JobClient.RawSplit[] splits = new JobClient.RawSplit[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
         splits[i] = new JobClient.RawSplit();
         splits[i].setLocations(new String[0]);
+      }
+      return splits;
+    }
+    
+    @Override
+    void createMapTasks(String ignored, JobClient.RawSplit[] splits) {
+      maps = new TaskInProgress[numMapTasks];
+      for (int i = 0; i < numMapTasks; i++) {
         maps[i] = new TaskInProgress(getJobID(), "test", 
             splits[i], jobtracker, getJobConf(), this, i, 1);
-        nonLocalMaps.add(maps[i]);
       }
+    }
+
+    @Override
+    void createReduceTasks(String ignored) {
       reduces = new TaskInProgress[numReduceTasks];
       for (int i = 0; i < numReduceTasks; i++) {
         reduces[i] = new TaskInProgress(getJobID(), "test", 
-                                        numMapTasks, i, 
-                                        jobtracker, getJobConf(), this, 1);
+            numMapTasks, i, 
+            jobtracker, getJobConf(), this, 1);
         nonRunningReduces.add(reduces[i]);
       }
-      tasksInited.set(true);
-      nonRunningMapCache = createCache(splits, maxLevel);
     }
-    
+
     private TaskAttemptID findTask(String trackerName, String trackerHost,
         Collection<TaskInProgress> nonRunningTasks, 
         Collection<TaskInProgress> runningTasks, TaskType taskType)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=798833&r1=798832&r2=798833&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
Wed Jul 29 10:07:12 2009
@@ -16,263 +16,273 @@
  * limitations under the License.
  */
 
+/**
+ * TestJobInProgress is a unit test to test consistency of JobInProgress class
+ * data structures under different conditions (speculation/locality) and at
+ * different stages (tasks are running/pending/killed)
+ */
+
 package org.apache.hadoop.mapred;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.HashSet;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.UtilsForTests;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.StaticMapping;
 
-import junit.framework.TestCase;
-
+@SuppressWarnings("deprecation")
 public class TestJobInProgress extends TestCase {
   static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
 
-  private MiniMRCluster mrCluster;
+  static FakeJobTracker jobTracker;
+
+  static String trackers[] = new String[] {
+    "tracker_tracker1.r1.com:1000", 
+    "tracker_tracker2.r1.com:1000",
+    "tracker_tracker3.r2.com:1000",
+    "tracker_tracker4.r3.com:1000"
+  };
+
+  static String[] hosts = new String[] {
+    "tracker1.r1.com",
+    "tracker2.r1.com",
+    "tracker3.r2.com",
+    "tracker4.r3.com"
+  };
+
+  static String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" };
+
+  static int numUniqueHosts = hosts.length;
+  static int clusterSize = trackers.length;
+
+  public static Test suite() {
+    TestSetup setup = new TestSetup(new TestSuite(TestJobInProgress.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("mapred.job.tracker", "localhost:0");
+        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        conf.setClass("topology.node.switch.mapping.impl", 
+            StaticMapping.class, DNSToSwitchMapping.class);
+        jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
+        // Set up the Topology Information
+        for (int i = 0; i < hosts.length; i++) {
+          StaticMapping.addNodeToRack(hosts[i], racks[i]);
+        }
+        for (String s: trackers) {
+          FakeObjectUtilities.establishFirstContact(jobTracker, s);
+        }
+      }
+    };
+    return setup;
+  }
 
-  private MiniDFSCluster dfsCluster;
-  JobTracker jt;
-  private static Path TEST_DIR = 
-    new Path(System.getProperty("test.build.data","/tmp"), "jip-testing");
-  private static int numSlaves = 4;
+  static class MyFakeJobInProgress extends FakeJobInProgress {
 
-  public static class FailMapTaskJob extends MapReduceBase implements
-      Mapper<LongWritable, Text, Text, IntWritable> {
+    MyFakeJobInProgress(JobConf jc, JobTracker jt) throws IOException {
+      super(jc, jt);
+    }
 
     @Override
-    public void map(LongWritable key, Text value,
-        OutputCollector<Text, IntWritable> output, Reporter reporter)
-        throws IOException {
-      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        throw new IllegalArgumentException("Interrupted MAP task");
+    JobClient.RawSplit[] createSplits() {
+      // Set all splits to reside on one host. This will ensure that 
+      // one tracker gets data local, one gets rack local and two others
+      // get non-local maps
+      RawSplit[] splits = new RawSplit[numMapTasks];
+      String[] splitHosts0 = new String[] { hosts[0] };
+      for (int i = 0; i < numMapTasks; i++) {
+        splits[i] = new RawSplit();
+        splits[i].setDataLength(0);
+        splits[i].setLocations(splitHosts0);
       }
-      throw new IllegalArgumentException("Failing MAP task");
+      return splits;
     }
-  }
 
-  // Suppressing waring as we just need to write a failing reduce task job
-  // We don't need to bother about the actual key value pairs which are passed.
-  @SuppressWarnings("unchecked")
-  public static class FailReduceTaskJob extends MapReduceBase implements
-      Reducer {
+    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, 
+        String taskTracker) {
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
 
-    @Override
-    public void reduce(Object key, Iterator values, OutputCollector output,
-        Reporter reporter) throws IOException {
-      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        throw new IllegalArgumentException("Failing Reduce task");
+    private TaskInProgress getTipForTaskID(TaskAttemptID tid, boolean isMap) {
+      TaskInProgress result = null;
+      TaskID id = tid.getTaskID();
+      TaskInProgress[] arrayToLook = isMap ? maps : reduces;
+
+      for (int i = 0; i < arrayToLook.length; i++) {
+        TaskInProgress tip = arrayToLook[i];
+        if (tip.getTIPId() == id) {
+          result = tip;
+          break;
+        }
       }
-      throw new IllegalArgumentException("Failing Reduce task");
+      return result;
     }
 
-  }
-
-  @Override
-  protected void setUp() throws Exception {
-    // TODO Auto-generated method stub
-    super.setUp();
-    Configuration conf = new Configuration();
-    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
-    mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
-        .getUri().toString(), 1);
-    jt = mrCluster.getJobTrackerRunner().getJobTracker();
+    /**
+     * Find a new Map or a reduce task and mark it as running on the specified
+     * tracker
+     */
+    public TaskAttemptID findAndRunNewTask(boolean isMap, 
+        String tt, String host,
+        int clusterSize,
+        int numUniqueHosts)
+    throws IOException {
+      TaskTrackerStatus tts = new TaskTrackerStatus(tt, host);
+      Task task = isMap ? 
+          obtainNewMapTask(tts, clusterSize, numUniqueHosts) : 
+            obtainNewReduceTask(tts, clusterSize, numUniqueHosts);
+          TaskAttemptID tid = task.getTaskID();
+          makeRunning(task.getTaskID(), getTipForTaskID(tid, isMap), tt);
+          return tid;
+    }
   }
 
   public void testPendingMapTaskCount() throws Exception {
-    launchTask(FailMapTaskJob.class, IdentityReducer.class);
-    checkTaskCounts();
-  }
-  
-  public void testPendingReduceTaskCount() throws Exception {
-    launchTask(IdentityMapper.class, FailReduceTaskJob.class);
-    checkTaskCounts();
+
+    int numMaps = 4;
+    int numReds = 4;
+
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(numMaps);
+    conf.setNumReduceTasks(numReds);
+    conf.setSpeculativeExecution(false);
+    conf.setBoolean(
+        "mapred.committer.job.setup.cleanup.needed", false);
+    MyFakeJobInProgress job1 = new MyFakeJobInProgress(conf, jobTracker);
+    job1.initTasks();
+
+    TaskAttemptID[] tid = new TaskAttemptID[numMaps];
+
+    for (int i = 0; i < numMaps; i++) {
+      tid[i] = job1.findAndRunNewTask(true, trackers[i], hosts[i],
+          clusterSize, numUniqueHosts);
+    }
+
+    // Fail all maps
+    for (int i = 0; i < numMaps; i++) {
+      job1.failTask(tid[i]);
+    }
+
+    MyFakeJobInProgress job2 = new MyFakeJobInProgress(conf, jobTracker);
+    job2.initTasks();
+
+    for (int i = 0; i < numMaps; i++) {
+      tid[i] = job2.findAndRunNewTask(true, trackers[i], hosts[i],
+          clusterSize, numUniqueHosts);
+      job2.finishTask(tid[i]);
+    }
+
+    for (int i = 0; i < numReds/2; i++) {
+      tid[i] = job2.findAndRunNewTask(false, trackers[i], hosts[i],
+          clusterSize, numUniqueHosts);
+    }
+
+    for (int i = 0; i < numReds/4; i++) {
+      job2.finishTask(tid[i]);
+    }
+
+    for (int i = numReds/4; i < numReds/2; i++) {
+      job2.failTask(tid[i]);
+    }
+
+    // Job1. All Maps have failed, no reduces have been scheduled
+    checkTaskCounts(job1, 0, numMaps, 0, numReds);
+
+    // Job2. All Maps have completed. One reducer has completed, one has 
+    // failed and two others have not been scheduled
+    checkTaskCounts(job2, 0, 0, 0, 3 * numReds / 4);
   }
 
   /**
    * Test if running tasks are correctly maintained for various types of jobs
    */
-  private void testRunningTaskCount(boolean speculation, boolean locality)
-  throws Exception {
-    LOG.info("Testing running jobs with speculation : " + speculation 
-             + ", locality : " + locality);
-    // cleanup
-    dfsCluster.getFileSystem().delete(TEST_DIR, true);
-    
-    final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
-    final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
-    
-    // configure a waiting job with 2 maps and 2 reducers
-    JobConf job = 
-      configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
-                locality);
-    job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
-    job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
-    
-    // Disable slow-start for reduces since this maps don't complete 
-    // in these test-cases...
-    job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
-    
-    // test jobs with speculation
-    job.setSpeculativeExecution(speculation);
-    JobClient jc = new JobClient(job);
-    RunningJob running = jc.submitJob(job);
-    JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
-    JobInProgress jip = jobtracker.getJob(running.getID());
-    LOG.info("Running job " + jip.getJobID());
-    
-    // wait
-    LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
-    waitTillReady(jip, job);
-    
+  static void testRunningTaskCount(boolean speculation)  throws Exception {
+    LOG.info("Testing running jobs with speculation : " + speculation); 
+
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(2);
+    conf.setNumReduceTasks(2);
+    conf.setSpeculativeExecution(speculation);
+    MyFakeJobInProgress jip = new MyFakeJobInProgress(conf, jobTracker);
+    jip.initTasks();
+
+    TaskAttemptID[] tid = new TaskAttemptID[4];
+
+    for (int i = 0; i < 2; i++) {
+      tid[i] = jip.findAndRunNewTask(true, trackers[i], hosts[i],
+          clusterSize, numUniqueHosts);
+    }
+
     // check if the running structures are populated
     Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
     for (Map.Entry<Node, Set<TaskInProgress>> s : 
-           jip.getRunningMapCache().entrySet()) {
+      jip.getRunningMapCache().entrySet()) {
       uniqueTasks.addAll(s.getValue());
     }
-    
+
     // add non local map tasks
     uniqueTasks.addAll(jip.getNonLocalRunningMaps());
-    
+
     assertEquals("Running map count doesnt match for jobs with speculation " 
-                 + speculation + ", and locality " + locality,
-                 jip.runningMaps(), uniqueTasks.size());
+        + speculation,
+        jip.runningMaps(), uniqueTasks.size());
 
-    assertEquals("Running reducer count doesnt match for jobs with speculation "
-                 + speculation + ", and locality " + locality,
-                 jip.runningReduces(), jip.getRunningReduces().size());
-    
-    // signal the tasks
-    LOG.info("Signaling the tasks");
-    UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
-                              mapSignalFile.toString(), 
-                              redSignalFile.toString(), numSlaves);
-    
-    // wait for the job to complete
-    LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
-    UtilsForTests.waitTillDone(jc);
-    
-    // cleanup
-    dfsCluster.getFileSystem().delete(TEST_DIR, true);
-  }
-  
-  // wait for the job to start
-  private void waitTillReady(JobInProgress jip, JobConf job) {
-    // wait for all the maps to get scheduled
-    while (jip.runningMaps() < job.getNumMapTasks()) {
-      UtilsForTests.waitFor(10);
-    }
-    
-    // wait for all the reducers to get scheduled
-    while (jip.runningReduces() < job.getNumReduceTasks()) {
-      UtilsForTests.waitFor(10);
+    for (int i = 0; i < 2; i++ ) {
+      tid[i] = jip.findAndRunNewTask(false, trackers[i], hosts[i],
+          clusterSize, numUniqueHosts);
     }
+
+    assertEquals("Running reducer count doesnt match for" +
+        " jobs with speculation "
+        + speculation,
+        jip.runningReduces(), jip.getRunningReduces().size());
+
   }
-  
+
   public void testRunningTaskCount() throws Exception {
-    // test with spec = false and locality=true
-    testRunningTaskCount(false, true);
-    
-    // test with spec = true and locality=true
-    testRunningTaskCount(true, true);
-    
-    // test with spec = false and locality=false
-    testRunningTaskCount(false, false);
-    
-    // test with spec = true and locality=false
-    testRunningTaskCount(true, false);
-  }
-  
-  @Override
-  protected void tearDown() throws Exception {
-    mrCluster.shutdown();
-    dfsCluster.shutdown();
-    super.tearDown();
-  }
-  
+    // test with spec = false 
+    testRunningTaskCount(false);
+
+    // test with spec = true
+    testRunningTaskCount(true);
 
-  void launchTask(Class MapClass,Class ReduceClass) throws Exception{
-    JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
-    try {
-      JobClient.runJob(job);
-    } catch (IOException ioe) {}
-  }
-  
-  @SuppressWarnings("unchecked")
-  JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
-                    boolean locality) 
-  throws Exception {
-    JobConf jobConf = mrCluster.createJobConf();
-    final Path inDir = new Path("./failjob/input");
-    final Path outDir = new Path("./failjob/output");
-    String input = "Test failing job.\n One more line";
-    FileSystem inFs = inDir.getFileSystem(jobConf);
-    FileSystem outFs = outDir.getFileSystem(jobConf);
-    outFs.delete(outDir, true);
-    if (!inFs.mkdirs(inDir)) {
-      throw new IOException("create directory failed" + inDir.toString());
-    }
-
-    DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
-    file.writeBytes(input);
-    file.close();
-    jobConf.setJobName("failmaptask");
-    if (locality) {
-      jobConf.setInputFormat(TextInputFormat.class);
-    } else {
-      jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
-    }
-    jobConf.setOutputKeyClass(Text.class);
-    jobConf.setOutputValueClass(Text.class);
-    jobConf.setMapperClass(MapClass);
-    jobConf.setCombinerClass(ReduceClass);
-    jobConf.setReducerClass(ReduceClass);
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    FileOutputFormat.setOutputPath(jobConf, outDir);
-    jobConf.setNumMapTasks(maps);
-    jobConf.setNumReduceTasks(reducers);
-    return jobConf; 
   }
 
-  void checkTaskCounts() {
-    JobStatus[] status = jt.getAllJobs();
-    for (JobStatus js : status) {
-      JobInProgress jip = jt.getJob(js.getJobID());
-      Counters counter = jip.getJobCounters();
-      long totalTaskCount = counter
-          .getCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
-          + counter.getCounter(JobCounter.TOTAL_LAUNCHED_REDUCES);
-      while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
-        assertEquals(true, (jip.runningMaps() >= 0));
-        assertEquals(true, (jip.pendingMaps() >= 0));
-        assertEquals(true, (jip.runningReduces() >= 0));
-        assertEquals(true, (jip.pendingReduces() >= 0));
-      }
-    }
+  static void checkTaskCounts(JobInProgress jip, int runningMaps,
+      int pendingMaps, int runningReduces, int pendingReduces) {
+    Counters counter = jip.getJobCounters();
+    long totalTaskCount = counter.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+    + counter.getCounter(JobCounter.TOTAL_LAUNCHED_REDUCES);
+
+    LOG.info("totalTaskCount is " + totalTaskCount);
+    LOG.info(" Running Maps:" + jip.runningMaps() +
+        " Pending Maps:" + jip.pendingMaps() + 
+        " Running Reds:" + jip.runningReduces() + 
+        " Pending Reds:" + jip.pendingReduces());
+
+    assertEquals(jip.getNumTaskCompletionEvents(),totalTaskCount);
+    assertEquals(runningMaps, jip.runningMaps());
+    assertEquals(pendingMaps, jip.pendingMaps());
+    assertEquals(runningReduces, jip.runningReduces());
+    assertEquals(pendingReduces, jip.pendingReduces());
   }
-  
+
 }



Mime
View raw message