hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1130816 - in /hadoop/mapreduce/branches/MR-279: CHANGES.txt mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Date Thu, 02 Jun 2011 21:17:57 GMT
Author: mahadev
Date: Thu Jun  2 21:17:54 2011
New Revision: 1130816

URL: http://svn.apache.org/viewvc?rev=1130816&view=rev
Log:
Fix to report job status if the application is KILLED/FAILED. (mahadev)

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1130816&r1=1130815&r2=1130816&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu Jun  2 21:17:54 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+
+    Fix to report job status if the application is KILLED/FAILED. (mahadev)
+
     Disable Job acls until fixed (mahadev)
 
     Fix container size rounding in AM and headroom in RM. (acmurthy and 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1130816&r1=1130815&r2=1130816&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Thu Jun  2 21:17:54 2011
@@ -67,6 +67,8 @@ public class ClientServiceDelegate {
 
   private Configuration conf;
   private ApplicationId currentAppId;
+  private ApplicationState currentAppState
+    = ApplicationState.PENDING;
   private final ResourceMgrDelegate rm;
   private MRClientProtocol realProxy = null;
   private String serviceAddr = "";
@@ -112,6 +114,7 @@ public class ClientServiceDelegate {
         }
         serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
         serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
+        currentAppState = appMaster.getState();
         if (UserGroupInformation.isSecurityEnabled()) {
           String clientTokenEncoded = appMaster.getClientToken();
           Token<ApplicationTokenIdentifier> clientToken =
@@ -139,14 +142,18 @@ public class ClientServiceDelegate {
       }
     }
 
+    currentAppState = appMaster.getState();
     /** we just want to return if its allocating, so that we dont 
      * block on it. This is to be able to return job status 
      * on a allocating Application.
      */
-    if (appMaster.getState() == ApplicationState.ALLOCATING) {
+    
+    if (currentAppState == ApplicationState.ALLOCATING || currentAppState == ApplicationState.KILLED
+        || currentAppState == ApplicationState.FAILED) {
+      realProxy = null;
       return;
     }
-
+    
     if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
       serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
           JHConfig.DEFAULT_HS_BIND_ADDRESS);
@@ -161,9 +168,6 @@ public class ClientServiceDelegate {
         throw new YarnException(e);
       }
     }
-    LOG.warn("Cannot connect to Application with state " + appMaster.getState());
-    throw new YarnException(
-        "Cannot connect to Application with state " + appMaster.getState());
   }
 
   private void instantiateAMProxy(final String serviceAddr) throws IOException {
@@ -203,7 +207,12 @@ public class ClientServiceDelegate {
     try {
       GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
       request.setJobId(jobID);
-      return TypeConverter.fromYarn(getProxy(arg0).getCounters(request).getCounters());
+      MRClientProtocol protocol = getProxy(arg0);
+      if (protocol == null) {
+        /* no AM to connect to, fake counters */
+        return new org.apache.hadoop.mapreduce.Counters();
+      }
+      return TypeConverter.fromYarn(protocol.getCounters(request).getCounters());
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
@@ -230,11 +239,12 @@ public class ClientServiceDelegate {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
     List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list
= null;
     GetTaskAttemptCompletionEventsRequest request = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    MRClientProtocol protocol;
     try {
       request.setJobId(jobID);
       request.setFromEventId(arg1);
       request.setMaxEvents(arg2);
-      MRClientProtocol protocol = getProxy(arg0);
+      protocol = getProxy(arg0);
       /** This is hack to get around the issue of faking jobstatus while the AM
        * is coming up.
        */
@@ -251,6 +261,10 @@ public class ClientServiceDelegate {
         request.setJobId(jobID);
         request.setFromEventId(arg1);
         request.setMaxEvents(arg2);
+        protocol = getRefreshedProxy(arg0);
+        if (protocol == null) {
+          return new TaskCompletionEvent[0];
+        }
         list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
@@ -292,33 +306,47 @@ public class ClientServiceDelegate {
     }
     return result;
   }
+  
+  private JobStatus createFakeJobReport(ApplicationState state, 
+      org.apache.hadoop.mapreduce.v2.api.records.JobId jobId, String jobFile) {
+    JobReport jobreport = recordFactory.newRecordInstance(JobReport.class);
+    jobreport.setCleanupProgress(0);
+    jobreport.setFinishTime(0);
+    jobreport.setJobId(jobId);
+    jobreport.setMapProgress(0);
+    /** fix this, the start time should be fixed */
+    jobreport.setStartTime(0);
+    jobreport.setReduceProgress(0);
+    jobreport.setSetupProgress(0);
+
+    if (currentAppState == ApplicationState.ALLOCATING) {
+      /* the protocol wasnt instantiated because the applicaton wasnt launched
+       * return a fake report.
+       */
+      jobreport.setJobState(JobState.INITED); 
+    } else if (currentAppState == ApplicationState.KILLED) {
+      jobreport.setJobState(JobState.KILLED);
+    } else if (currentAppState == ApplicationState.FAILED) {
+      jobreport.setJobState(JobState.FAILED);
+    }
+    return  TypeConverter.fromYarn(jobreport, jobFile, serviceHttpAddr);
+  }
 
   public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
   YarnRemoteException {
     org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = 
       TypeConverter.toYarn(oldJobID);
-    LOG.debug("Getting Job status");
     String stagingDir = conf.get("yarn.apps.stagingDir");
     String jobFile = stagingDir + "/" + jobId.toString();
     JobReport report = null;
+    MRClientProtocol protocol;
     GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
     try {
       request.setJobId(jobId);
-      MRClientProtocol protocol = getProxy(oldJobID);
+      protocol = getProxy(oldJobID);
+      
       if (protocol == null) {
-        /* the protocol wasnt instantiated because the applicaton wasnt launched
-         * return a fake report.
-         */
-        JobReport jobreport = recordFactory.newRecordInstance(JobReport.class);
-        jobreport.setCleanupProgress(0);
-        jobreport.setFinishTime(0);
-        jobreport.setJobId(jobId);
-        jobreport.setJobState(JobState.INITED);
-        jobreport.setMapProgress(0);
-        jobreport.setStartTime(0);
-        jobreport.setReduceProgress(0);
-        jobreport.setSetupProgress(0);
-        return  TypeConverter.fromYarn(jobreport, jobFile, serviceHttpAddr);
+        return createFakeJobReport(currentAppState, jobId, jobFile);
       }
       report = getProxy(oldJobID).getJobReport(request).getJobReport();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
@@ -327,6 +355,11 @@ public class ClientServiceDelegate {
     } catch (Exception e) {
       try {
         request.setJobId(jobId);
+        protocol = getRefreshedProxy(oldJobID);
+        /* this is possible if an application that was running is killed */
+        if (protocol == null)  {
+          return createFakeJobReport(currentAppState, jobId, jobFile);
+        }
         report = getRefreshedProxy(oldJobID).getJobReport(request).getJobReport();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));



Mime
View raw message