hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject svn commit: r1140266 - in /hadoop/common/branches/MR-279/mapreduce: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/...
Date Mon, 27 Jun 2011 18:54:24 GMT
Author: llu
Date: Mon Jun 27 18:54:24 2011
New Revision: 1140266

URL: http://svn.apache.org/viewvc?rev=1140266&view=rev
Log:
MAPREDUCE-2618. Fix NPE in 0 map 0 reduce jobs. (Jeffrey Naisbitt via llu)

Added:
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified:
    hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1140266&r1=1140265&r2=1140266&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Mon Jun 27 18:54:24 2011
@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    MAPREDUCE-2618. Fix NPE in 0 map 0 reduce jobs. (Jeffrey Naisbitt via llu)
+
     Fix some invalid transitions in the RM. (vinodkv via ddas)
     
     Fix diagnostics display for more than 100 apps in RM. (llu)

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1140266&r1=1140265&r2=1140266&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
Mon Jun 27 18:54:24 2011
@@ -35,6 +35,9 @@ public enum JobEventType {
   JOB_MAP_TASK_RESCHEDULED,
   JOB_TASK_ATTEMPT_COMPLETED,
 
+  //Producer:Job
+  JOB_COMPLETED,
+
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,
   INTERNAL_ERROR,

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1140266&r1=1140265&r2=1140266&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Mon Jun 27 18:54:24 2011
@@ -230,6 +230,11 @@ public class JobImpl implements org.apac
               EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
+          .addTransition
+              (JobState.RUNNING,
+              EnumSet.of(JobState.RUNNING, JobState.SUCCEEDED, JobState.FAILED),
+              JobEventType.JOB_COMPLETED,
+              new JobNoTasksCompletedTransition())
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
@@ -393,6 +398,19 @@ public class JobImpl implements org.apac
     return jobId;
   }
 
+  // Getter methods that make unit testing easier (package-scoped)
+  OutputCommitter getCommitter() {
+    return this.committer;
+  }
+
+  EventHandler getEventHandler() {
+    return this.eventHandler;
+  }
+
+  JobContext getJobContext() {
+    return this.jobContext;
+  }
+
   @Override
   public boolean checkAccess(UserGroupInformation callerUGI, 
       JobACL jobOperation) {
@@ -687,11 +705,34 @@ public class JobImpl implements org.apac
     metrics.waitingTask(task);
   }
 
-  private void setFinishTime() {
+  void setFinishTime() {
     finishTime = clock.getTime();
   }
+
+  void logJobHistoryFinishedEvent() {
+    this.setFinishTime();
+    JobFinishedEvent jfe = createJobFinishedEvent(this);
+    LOG.info("Calling handler for JobFinishedEvent ");
+    this.getEventHandler().handle(new JobHistoryEvent(this.jobId, jfe));    
+  }
   
-  private JobState finished(JobState finalState) {
+  static JobState checkJobCompleteSuccess(JobImpl job) {
+    // check for Job success
+    if (job.completedTaskCount == job.getTasks().size()) {
+      try {
+        // Commit job & do cleanup
+        job.getCommitter().commitJob(job.getJobContext());
+      } catch (IOException e) {
+        LOG.warn("Could not do commit for Job", e);
+      }
+      
+      job.logJobHistoryFinishedEvent();
+      return job.finished(JobState.SUCCEEDED);
+    }
+    return null;
+  }
+
+  JobState finished(JobState finalState) {
     if (getState() == JobState.RUNNING) {
       metrics.endRunningJob(this);
     }
@@ -759,10 +800,9 @@ public class JobImpl implements org.apac
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
         job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
-        
+
         if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
           job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
-          return job.finished(JobState.FAILED);
         }
 
         checkTaskLimits();
@@ -1064,6 +1104,11 @@ public class JobImpl implements org.apac
           job.submitTime, job.startTime);
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.metrics.runningJob(job);
+
+			// If we have no tasks, just transition to job completed
+      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
+        job.eventHandler.handle(new JobEvent(job.jobId, JobEventType.JOB_COMPLETED));
+      }
     }
   }
 
@@ -1237,19 +1282,9 @@ public class JobImpl implements org.apac
         return job.finished(JobState.FAILED);
       }
       
-      //check for Job success
-      if (job.completedTaskCount == job.tasks.size()) {
-        try {
-          job.committer.commitJob(job.jobContext);
-        } catch (IOException e) {
-          LOG.warn("Could not do commit for Job", e);
-        }
-       // Log job-history
-        job.setFinishTime();
-        JobFinishedEvent jfe = createJobFinishedEvent(job);
-        LOG.info("Calling handler for JobFinishedEvent ");
-        job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
-        return job.finished(JobState.SUCCEEDED);
+      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      if (jobCompleteSuccess != null) {
+        return jobCompleteSuccess;
       }
       
       //return the current state, Job not finished yet
@@ -1285,6 +1320,22 @@ public class JobImpl implements org.apac
     }
   }
 
+  // Transition class for handling jobs with no tasks
+  static class JobNoTasksCompletedTransition implements
+  MultipleArcTransition<JobImpl, JobEvent, JobState> {
+
+    @Override
+    public JobState transition(JobImpl job, JobEvent event) {
+      JobState jobCompleteSuccess = JobImpl.checkJobCompleteSuccess(job);
+      if (jobCompleteSuccess != null) {
+        return jobCompleteSuccess;
+      }
+      
+      // Return the current state, Job not finished yet
+      return job.getState();
+    }
+  }
+
   private static class MapTaskRescheduledTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override

Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1140266&r1=1140265&r2=1140266&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
Mon Jun 27 18:54:24 2011
@@ -59,10 +59,10 @@ public class TestMRApp {
   }
   
   @Test
-  public void testZeroMapReduces() throws Exception {
+  public void testZeroMapReduces() throws Exception{
     MRApp app = new MRApp(0, 0, true, this.getClass().getName(), true);
     Job job = app.submit(new Configuration());
-    app.waitForState(job, JobState.FAILED);
+    app.waitForState(job, JobState.SUCCEEDED);
   }
   
   @Test

Added: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1140266&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
(added)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Mon Jun 27 18:54:24 2011
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+import org.junit.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.any;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+
+
+/**
+ * Tests various functions of the JobImpl class
+ */
+public class TestJobImpl {
+  
+  @Test
+  public void testJobNoTasksTransition() { 
+    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Force checkJobCompleteSuccess to return null
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    when(mockJob.getTasks()).thenReturn(tasks);
+
+    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    JobEvent mockJobEvent = mock(JobEvent.class);
+    JobState state = trans.transition(mockJob, mockJobEvent);
+    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
+        JobState.ERROR, state);
+  }
+  
+  @Test
+  public void testCheckJobCompleteSuccess() {
+    
+    JobImpl mockJob = mock(JobImpl.class);
+    OutputCommitter mockCommitter = mock(OutputCommitter.class);
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    JobContext mockJobContext = mock(JobContext.class);
+    
+    when(mockJob.getCommitter()).thenReturn(mockCommitter);
+    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
+    when(mockJob.getJobContext()).thenReturn(mockJobContext);
+    doNothing().when(mockJob).setFinishTime();
+    doNothing().when(mockJob).logJobHistoryFinishedEvent();
+    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+
+    try {
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+    } catch (IOException e) {
+      // commitJob stubbed out, so this can't happen
+    }
+    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
+      "for successful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
+        JobImpl.checkJobCompleteSuccess(mockJob), JobState.SUCCEEDED);    
+  }
+
+  @Test
+  public void testCheckJobCompleteSuccessFailed() {
+    JobImpl mockJob = mock(JobImpl.class);
+
+    // Make the completedTasks not equal the getTasks()
+    Task mockTask = mock(Task.class);
+    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+    tasks.put(mockTask.getID(), mockTask);
+    when(mockJob.getTasks()).thenReturn(tasks);
+    
+    try {
+      // Just in case the code breaks and reaches these calls
+      OutputCommitter mockCommitter = mock(OutputCommitter.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
+      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
+    } catch (IOException e) {
+      e.printStackTrace();    
+    }
+    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
+      "for unsuccessful job",
+      JobImpl.checkJobCompleteSuccess(mockJob));
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    TestJobImpl t = new TestJobImpl();
+    t.testJobNoTasksTransition();
+    t.testCheckJobCompleteSuccess();
+    t.testCheckJobCompleteSuccessFailed();
+  }
+}



Mime
View raw message