hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r725729 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java src/test/org/apache/hadoop/mapred/NotificationTestCase.java
Date Thu, 11 Dec 2008 16:18:19 GMT
Author: ddas
Date: Thu Dec 11 08:18:18 2008
New Revision: 725729

URL: http://svn.apache.org/viewvc?rev=725729&view=rev
Log:
HADOOP-4737. Adds the KILLED notification when jobs get killed. Contributed by Amareshwari
Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 08:18:18 2008
@@ -218,6 +218,9 @@
     HADOOP-4688. Modify the MiniMRDFSSort unit test to spill multiple times,
     exercising the map-side merge code. (cdouglas)
 
+    HADOOP-4737. Adds the KILLED notification when jobs get killed.
+    (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java Thu Dec 11 08:18:18
2008
@@ -105,7 +105,8 @@
       }
       if (uri.contains("$jobStatus")) {
         String statusStr =
-          (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : "FAILED";
+          (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : 
+            (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
       notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Thu Dec 11 08:18:18
2008
@@ -62,6 +62,7 @@
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
+    boolean killed = false;
     
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
@@ -100,6 +101,8 @@
     
     @Override
     public void run() {
+      JobContext jContext = new JobContext(conf);
+      OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits
         InputSplit[] splits;
@@ -112,33 +115,35 @@
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
-        JobContext jContext = new JobContext(conf);
-        OutputCommitter outputCommitter = job.getOutputCommitter();
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
-          TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
-          mapIds.add(mapId);
-          buffer.reset();
-          splits[i].write(buffer);
-          BytesWritable split = new BytesWritable();
-          split.set(buffer.getData(), 0, buffer.getLength());
-          MapTask map = new MapTask(file.toString(),  
-                                    mapId, i,
-                                    splits[i].getClass().getName(),
-                                    split);
-          JobConf localConf = new JobConf(job);
-          map.setJobFile(localFile.toString());
-          map.localizeConfiguration(localConf);
-          map.setConf(localConf);
-          map_tasks += 1;
-          myMetrics.launchMap(mapId);
-          map.run(localConf, this);
-          myMetrics.completeMap(mapId);
-          map_tasks -= 1;
-          updateCounters(map);
+          if (!this.isInterrupted()) {
+            TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
+            mapIds.add(mapId);
+            buffer.reset();
+            splits[i].write(buffer);
+            BytesWritable split = new BytesWritable();
+            split.set(buffer.getData(), 0, buffer.getLength());
+            MapTask map = new MapTask(file.toString(),  
+                                      mapId, i,
+                                      splits[i].getClass().getName(),
+                                      split);
+            JobConf localConf = new JobConf(job);
+            map.setJobFile(localFile.toString());
+            map.localizeConfiguration(localConf);
+            map.setConf(localConf);
+            map_tasks += 1;
+            myMetrics.launchMap(mapId);
+            map.run(localConf, this);
+            myMetrics.completeMap(mapId);
+            map_tasks -= 1;
+            updateCounters(map);
+          } else {
+            throw new InterruptedException();
+          }
         }
         TaskAttemptID reduceId = 
           new TaskAttemptID(new TaskID(jobId, false, 0), 0);
@@ -146,19 +151,23 @@
           if (numReduceTasks > 0) {
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
-              TaskAttemptID mapId = mapIds.get(i);
-              Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-              Path reduceIn = this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),reduceId,
-                  localFs.getLength(mapOut));
-              if (!localFs.mkdirs(reduceIn.getParent())) {
-                throw new IOException("Mkdirs failed to create "
-                    + reduceIn.getParent().toString());
+              if (!this.isInterrupted()) {
+                TaskAttemptID mapId = mapIds.get(i);
+                Path mapOut = this.mapoutputFile.getOutputFile(mapId);
+                Path reduceIn = this.mapoutputFile.getInputFileForWrite(
+                                  mapId.getTaskID(),reduceId,
+                                  localFs.getLength(mapOut));
+                if (!localFs.mkdirs(reduceIn.getParent())) {
+                  throw new IOException("Mkdirs failed to create "
+                      + reduceIn.getParent().toString());
+                }
+                if (!localFs.rename(mapOut, reduceIn))
+                  throw new IOException("Couldn't rename " + mapOut);
+              } else {
+                throw new InterruptedException();
               }
-              if (!localFs.rename(mapOut, reduceIn))
-                throw new IOException("Couldn't rename " + mapOut);
             }
-
-            {
+            if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
@@ -171,6 +180,8 @@
               myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
+            } else {
+              throw new InterruptedException();
             }
           }
         } finally {
@@ -185,12 +196,26 @@
         outputCommitter.cleanupJob(jContext);
         status.setCleanupProgress(1.0f);
 
-        this.status.setRunState(JobStatus.SUCCEEDED);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.SUCCEEDED);
+        }
 
         JobEndNotifier.localRunnerNotification(job, status);
 
       } catch (Throwable t) {
-        this.status.setRunState(JobStatus.FAILED);
+        try {
+          outputCommitter.cleanupJob(jContext);
+        } catch (IOException ioe) {
+          LOG.info("Error cleaning up job:" + id);
+        }
+        status.setCleanupProgress(1.0f);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.FAILED);
+        }
         LOG.warn(id, t);
 
         JobEndNotifier.localRunnerNotification(job, status);
@@ -307,7 +332,8 @@
   }
 
   public void killJob(JobID id) {
-    jobs.get(id).stop();
+    jobs.get(id).killed = true;
+    jobs.get(id).interrupt();
   }
 
   public void setJobPriority(JobID id, String jp) throws IOException {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java?rev=725729&r1=725728&r2=725729&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/NotificationTestCase.java Thu Dec
11 08:18:18 2008
@@ -100,7 +100,24 @@
 
     protected void doGet(HttpServletRequest req, HttpServletResponse res)
       throws ServletException, IOException {
-      if (counter == 0) {
+      switch (counter) {
+        case 0:
+        {
+          assertTrue(req.getQueryString().contains("SUCCEEDED"));
+        }
+        break;
+        case 2:
+        {
+          assertTrue(req.getQueryString().contains("KILLED"));
+        }
+        break;
+        case 4:
+        {
+          assertTrue(req.getQueryString().contains("FAILED"));
+        }
+        break;
+      }
+      if (counter % 2 == 0) {
         stdPrintln((new Date()).toString() +
                    "Receiving First notification for [" + req.getQueryString() +
                    "], returning error");
@@ -148,6 +165,22 @@
       Thread.currentThread().sleep(2000);
     }
     assertEquals(2, NotificationServlet.counter);
+    
+    // run a job with KILLED status
+    System.out.println(TestJobKillAndFail.runJobKill(this.createJobConf()));
+    synchronized(Thread.currentThread()) {
+      stdPrintln("Sleeping for 2 seconds to give time for retry");
+      Thread.currentThread().sleep(2000);
+    }
+    assertEquals(4, NotificationServlet.counter);
+    
+    // run a job with FAILED status
+    System.out.println(TestJobKillAndFail.runJobFail(this.createJobConf()));
+    synchronized(Thread.currentThread()) {
+      stdPrintln("Sleeping for 2 seconds to give time for retry");
+      Thread.currentThread().sleep(2000);
+    }
+    assertEquals(6, NotificationServlet.counter);
   }
 
   private String launchWordCount(JobConf conf,



Mime
View raw message