hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1084852 - in /hadoop/mapreduce/branches/MR-279/mr-client: hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client-app/src...
Date Thu, 24 Mar 2011 06:26:04 GMT
Author: sharad
Date: Thu Mar 24 06:26:03 2011
New Revision: 1084852

URL: http://svn.apache.org/viewvc?rev=1084852&view=rev
Log:
Fixed commands list-attempt-ids, kill-task, kill job.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
Removed:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java
Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapTaskAttemptImpl.java Thu Mar 24 06:26:03 2011
@@ -24,11 +24,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.event.EventHandler;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ReduceTaskAttemptImpl.java Thu Mar 24 06:26:03 2011
@@ -25,10 +25,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.event.EventHandler;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Mar 24 06:26:03 2011
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
@@ -43,7 +44,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.CompositeService;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 24 06:26:03 2011
@@ -29,17 +29,17 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Thu Mar 24 06:26:03 2011
@@ -33,9 +33,20 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -48,8 +59,6 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
@@ -58,16 +67,6 @@ import org.apache.hadoop.yarn.security.c
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
 
 /**
  * This module is responsible for talking to the 
@@ -84,12 +83,11 @@ public class MRClientService extends Abs
   private WebApp webApp;
   private InetSocketAddress bindAddress;
   private AppContext appContext;
-  private EventHandler<Event> handler;
 
   public MRClientService(AppContext appContext) {
     super("MRClientService");
     this.appContext = appContext;
-    this.protocolHandler = new MRClientProtocolHandler(appContext);
+    this.protocolHandler = new MRClientProtocolHandler();
   }
 
   public void start() {
@@ -153,65 +151,73 @@ public class MRClientService extends Abs
 
   class MRClientProtocolHandler implements MRClientProtocol {
 
-    private AppContext appContext;
-
-    private Job getJob(JobID jobID) throws AvroRemoteException {
+    private Job verifyAndGetJob(JobID jobID) throws AvroRemoteException {
       Job job = appContext.getJob(jobID);
       if (job == null) {
         throw RPCUtil.getRemoteException("Unknown job " + jobID);
       }
       return job;
     }
-    
-    MRClientProtocolHandler(AppContext appContext) {
-      this.appContext = appContext;
+ 
+    private Task verifyAndGetTask(TaskID taskID) throws AvroRemoteException {
+      Task task = verifyAndGetJob(taskID.jobID).getTask(taskID);
+      if (task == null) {
+        throw RPCUtil.getRemoteException("Unknown Task " + taskID);
+      }
+      return task;
+    }
+
+    private TaskAttempt verifyAndGetAttempt(TaskAttemptID attemptID) 
+          throws AvroRemoteException {
+      TaskAttempt attempt = verifyAndGetTask(attemptID.taskID).getAttempt(attemptID);
+      if (attempt == null) {
+        throw RPCUtil.getRemoteException("Unknown TaskAttempt " + attemptID);
+      }
+      return attempt;
     }
 
     @Override
     public Counters getCounters(JobID jobID) throws AvroRemoteException {
-      Job job = getJob(jobID);
+      Job job = verifyAndGetJob(jobID);
       return job.getCounters();
     }
 
     @Override
     public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
-      Job job = getJob(jobID);
+      Job job = verifyAndGetJob(jobID);
       return job.getReport();
     }
 
     @Override
     public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
-      Job job = getJob(taskAttemptID.taskID.jobID);
-      return job.getTask(taskAttemptID.taskID).
-          getAttempt(taskAttemptID).getReport();
+      return verifyAndGetAttempt(taskAttemptID).getReport();
     }
 
     @Override
     public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
-      Job job = appContext.getJob(taskID.jobID);
-      return job.getTask(taskID).getReport();
+      return verifyAndGetTask(taskID).getReport();
     }
 
     @Override
     public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID, 
         int fromEventId, int maxEvents) throws AvroRemoteException {
-      Job job = appContext.getJob(jobID);
+      Job job = verifyAndGetJob(jobID);
       return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents));
     }
 
     @Override
     public Void killJob(JobID jobID) throws AvroRemoteException {
-      getJob(jobID);
-      handler.handle(
+      verifyAndGetJob(jobID);
+      appContext.getEventHandler().handle(
           new JobEvent(jobID, JobEventType.JOB_KILL));
       return null;
     }
 
     @Override
     public Void killTask(TaskID taskID) throws AvroRemoteException {
-      getJob(taskID.jobID);
-      handler.handle(
+      verifyAndGetTask(taskID);
+      appContext.getEventHandler().handle(
           new TaskEvent(taskID, TaskEventType.T_KILL));
       return null;
     }
@@ -219,8 +225,8 @@ public class MRClientService extends Abs
     @Override
     public Void killTaskAttempt(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
-      getJob(taskAttemptID.taskID.jobID);
-     handler.handle(
+      verifyAndGetAttempt(taskAttemptID);
+      appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptID, 
               TaskAttemptEventType.TA_KILL));
       return null;
@@ -229,17 +235,17 @@ public class MRClientService extends Abs
     @Override
     public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
-      Job job = getJob(taskAttemptID.taskID.jobID);
-      return job.getTask(taskAttemptID.taskID).
-                 getAttempt(taskAttemptID).getDiagnostics();
+      return verifyAndGetAttempt(taskAttemptID).getDiagnostics();
     }
 
     @Override
     public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
         throws AvroRemoteException {
-      Job job = appContext.getJob(jobID);
+      Job job = verifyAndGetJob(jobID);
+      LOG.info("Getting task report for " + taskType + "   " + jobID);
       List<TaskReport> reports = new ArrayList<TaskReport>();
       Collection<Task> tasks = job.getTasks(taskType).values();
+      LOG.info("Getting task report size " + tasks.size());
       for (Task task : tasks) {
         reports.add(task.getReport());
       }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Mar 24 06:26:03 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
@@ -72,7 +73,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar 24 06:26:03 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapred.WrappedJ
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
@@ -85,7 +86,6 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar 24 06:26:03 2011
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
@@ -53,7 +54,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.event.EventHandler;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Thu Mar 24 06:26:03 2011
@@ -28,11 +28,11 @@ import java.util.Map;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.ContainerID;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Thu Mar 24 06:26:03 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
@@ -38,7 +39,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.v2.api.JobID;
 import org.apache.hadoop.mapreduce.v2.api.JobState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Thu Mar 24 06:26:03 2011
@@ -24,6 +24,7 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -31,7 +32,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.JobState;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.TaskState;

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1084852&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Mar 24 06:26:03 2011
@@ -0,0 +1,327 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.Counter;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.YarnException;
+
+public class TypeConverter {
+
+  public static org.apache.hadoop.mapred.JobID fromYarn(JobID id) {
+    String identifier = fromClusterTimeStamp(id.appID.clusterTimeStamp);
+    return new org.apache.hadoop.mapred.JobID(identifier, id.id);
+  }
+
+  //currently there is 1-1 mapping between appid and jobid
+  public static org.apache.hadoop.mapreduce.JobID fromYarn(ApplicationID appID) {
+    String identifier = fromClusterTimeStamp(appID.clusterTimeStamp);
+    return new org.apache.hadoop.mapred.JobID(identifier, appID.id);
+  }
+
+  public static JobID toYarn(org.apache.hadoop.mapreduce.JobID id) {
+    JobID jobID = new JobID();
+    jobID.id = id.getId(); //currently there is 1-1 mapping between appid and jobid
+    jobID.appID = new ApplicationID();
+    jobID.appID.id = id.getId();
+    jobID.appID.clusterTimeStamp = toClusterTimeStamp(id.getJtIdentifier());
+    return jobID;
+  }
+
+  private static String fromClusterTimeStamp(long clusterTimeStamp) {
+    return Long.toString(clusterTimeStamp);
+  }
+
+  private static long toClusterTimeStamp(String identifier) {
+    return Long.parseLong(identifier);
+  }
+
+  public static org.apache.hadoop.mapreduce.TaskType fromYarn(
+      TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return org.apache.hadoop.mapreduce.TaskType.MAP;
+    case REDUCE:
+      return org.apache.hadoop.mapreduce.TaskType.REDUCE;
+    default:
+      throw new YarnException("Unrecognized task type: " + taskType);
+    }
+  }
+
+  public static TaskType
+      toYarn(org.apache.hadoop.mapreduce.TaskType taskType) {
+    switch (taskType) {
+    case MAP:
+      return TaskType.MAP;
+    case REDUCE:
+      return TaskType.REDUCE;
+    default:
+      throw new YarnException("Unrecognized task type: " + taskType);
+    }
+  }
+
+  public static org.apache.hadoop.mapred.TaskID fromYarn(TaskID id) {
+    return new org.apache.hadoop.mapred.TaskID(fromYarn(id.jobID), fromYarn(id.taskType),
+        id.id);
+  }
+
+  public static TaskID toYarn(org.apache.hadoop.mapreduce.TaskID id) {
+    TaskID taskID = new TaskID();
+    taskID.id = id.getId();
+    taskID.taskType = toYarn(id.getTaskType());
+    taskID.jobID = toYarn(id.getJobID());
+    return taskID;
+  }
+
+  public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
+    switch (phase) {
+    case STARTING:
+      return Phase.STARTING;
+    case MAP:
+      return Phase.MAP;
+    case SHUFFLE:
+      return Phase.SHUFFLE;
+    case SORT:
+      return Phase.SORT;
+    case REDUCE:
+      return Phase.REDUCE;
+    case CLEANUP:
+      return Phase.CLEANUP;
+    }
+    throw new YarnException("Unrecognized Phase: " + phase);
+  }
+
+  public static TaskCompletionEvent[] fromYarn(
+      org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] newEvents) {
+    TaskCompletionEvent[] oldEvents =
+        new TaskCompletionEvent[newEvents.length];
+    int i = 0;
+    for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent 
+        : newEvents) {
+      oldEvents[i++] = fromYarn(newEvent);
+    }
+    return oldEvents;
+  }
+
+  public static TaskCompletionEvent fromYarn(
+      org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent) {
+    return new TaskCompletionEvent(newEvent.eventId,
+              fromYarn(newEvent.attemptId), newEvent.attemptId.id,
+              newEvent.attemptId.taskID.taskType.equals(TaskType.MAP),
+              fromYarn(newEvent.status),
+              newEvent.mapOutputServerAddress.toString());
+  }
+
+  public static TaskCompletionEvent.Status fromYarn(
+      TaskAttemptCompletionEventStatus newStatus) {
+    switch (newStatus) {
+    case FAILED:
+      return TaskCompletionEvent.Status.FAILED;
+    case KILLED:
+      return TaskCompletionEvent.Status.KILLED;
+    case OBSOLETE:
+      return TaskCompletionEvent.Status.OBSOLETE;
+    case SUCCEEDED:
+      return TaskCompletionEvent.Status.SUCCEEDED;
+    case TIPFAILED:
+      return TaskCompletionEvent.Status.TIPFAILED;
+    }
+    throw new YarnException("Unrecognized status: " + newStatus);
+  }
+
+  public static org.apache.hadoop.mapred.TaskAttemptID fromYarn(
+      TaskAttemptID id) {
+    return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.taskID),
+        id.id);
+  }
+
+  public static TaskAttemptID toYarn(
+      org.apache.hadoop.mapred.TaskAttemptID id) {
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    taskAttemptID.taskID = toYarn(id.getTaskID());
+    taskAttemptID.id = id.getId();
+    return taskAttemptID;
+  }
+
+  public static TaskAttemptID toYarn(
+      org.apache.hadoop.mapreduce.TaskAttemptID id) {
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    taskAttemptID.taskID = toYarn(id.getTaskID());
+    taskAttemptID.id = id.getId();
+    return taskAttemptID;
+  }
+  
+  public static org.apache.hadoop.mapreduce.Counters fromYarn(
+      Counters yCntrs) {
+    org.apache.hadoop.mapreduce.Counters counters = 
+      new org.apache.hadoop.mapreduce.Counters();
+    for (CounterGroup yGrp : yCntrs.groups.values()) {
+      for (Counter yCntr : yGrp.counters.values()) {
+        org.apache.hadoop.mapreduce.Counter c = 
+          counters.findCounter(yGrp.displayname.toString(), 
+              yCntr.displayName.toString());
+        c.setValue(yCntr.value);
+      }
+    }
+    return counters;
+  }
+
+  public static Counters toYarn(org.apache.hadoop.mapred.Counters counters) {
+    Counters yCntrs = new Counters();
+    yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+    for (org.apache.hadoop.mapred.Counters.Group grp : counters) {
+      CounterGroup yGrp = new CounterGroup();
+      yGrp.name = grp.getName();
+      yGrp.displayname = grp.getDisplayName();
+      yGrp.counters = new HashMap<CharSequence, Counter>();
+      for (org.apache.hadoop.mapred.Counters.Counter cntr : grp) {
+        Counter yCntr = new Counter();
+        yCntr.name = cntr.getName();
+        yCntr.displayName = cntr.getDisplayName();
+        yCntr.value = cntr.getValue();
+        yGrp.counters.put(yCntr.name, yCntr);
+      }
+      yCntrs.groups.put(yGrp.name, yGrp);
+    }
+    return yCntrs;
+  }
+
+  public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
+    Counters yCntrs = new Counters();
+    yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+    for (org.apache.hadoop.mapreduce.CounterGroup grp : counters) {
+      CounterGroup yGrp = new CounterGroup();
+      yGrp.name = grp.getName();
+      yGrp.displayname = grp.getDisplayName();
+      yGrp.counters = new HashMap<CharSequence, Counter>();
+      for (org.apache.hadoop.mapreduce.Counter cntr : grp) {
+        Counter yCntr = new Counter();
+        yCntr.name = cntr.getName();
+        yCntr.displayName = cntr.getDisplayName();
+        yCntr.value = cntr.getValue();
+        yGrp.counters.put(yCntr.name, yCntr);
+      }
+      yCntrs.groups.put(yGrp.name, yGrp);
+    }
+    return yCntrs;
+  }
+  
+  public static org.apache.hadoop.mapred.JobStatus fromYarn(
+      JobReport jobreport, String jobFile, String trackingUrl) {
+    String user = null,  jobName = null;
+    JobPriority jobPriority = JobPriority.NORMAL;
+    return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.id),
+        jobreport.setupProgress, jobreport.mapProgress,
+        jobreport.reduceProgress, jobreport.cleanupProgress,
+        fromYarn(jobreport.state),
+        jobPriority, user, jobName, jobFile, trackingUrl);
+  }
+  
+  public static int fromYarn(JobState state) {
+    switch (state) {
+    case NEW:
+      return org.apache.hadoop.mapred.JobStatus.PREP;
+    case RUNNING:
+      return org.apache.hadoop.mapred.JobStatus.RUNNING;
+    case KILL_WAIT:
+    case KILLED:
+      return org.apache.hadoop.mapred.JobStatus.KILLED;
+    case SUCCEEDED:
+      return org.apache.hadoop.mapred.JobStatus.SUCCEEDED;
+    case FAILED:
+    case ERROR:
+      return org.apache.hadoop.mapred.JobStatus.FAILED;
+    }
+    throw new YarnException("Unrecognized job state: " + state);
+  }
+
+  public static org.apache.hadoop.mapred.TIPStatus fromYarn(
+      TaskState state) {
+    switch (state) {
+    case NEW:
+    case SCHEDULED:
+      return org.apache.hadoop.mapred.TIPStatus.PENDING;
+    case RUNNING:
+      return org.apache.hadoop.mapred.TIPStatus.RUNNING;
+    case KILL_WAIT:
+    case KILLED:
+      return org.apache.hadoop.mapred.TIPStatus.KILLED;
+    case SUCCEEDED:
+      return org.apache.hadoop.mapred.TIPStatus.COMPLETE;
+    case FAILED:
+      return org.apache.hadoop.mapred.TIPStatus.FAILED;
+    }
+    throw new YarnException("Unrecognized task state: " + state);
+  }
+  
+  public static TaskReport fromYarn(org.apache.hadoop.mapreduce.v2.api.TaskReport report) {
+    String[] diagnostics = null;
+    if (report.diagnostics != null) {
+      diagnostics = new String[report.diagnostics.size()];
+      int i = 0;
+      for (CharSequence cs : report.diagnostics) {
+        diagnostics[i++] = cs.toString();
+      }
+    } else {
+      diagnostics = new String[0];
+    }
+    TaskReport rep = new TaskReport(fromYarn(report.id), 
+        report.progress, report.state.toString(),
+      diagnostics, fromYarn(report.state), report.startTime, report.finishTime,
+      fromYarn(report.counters));
+    List<org.apache.hadoop.mapreduce.TaskAttemptID> runningAtts 
+          = new ArrayList<org.apache.hadoop.mapreduce.TaskAttemptID>();
+    for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptID id 
+        : report.runningAttempts) {
+      runningAtts.add(fromYarn(id));
+    }
+    rep.setRunningTaskAttemptIds(runningAtts);
+    if (report.successfulAttempt != null) {
+      rep.setSuccessfulAttemptId(fromYarn(report.successfulAttempt));
+    }
+    return rep;
+  }
+  
+  public static List<TaskReport> fromYarn(
+      List<org.apache.hadoop.mapreduce.v2.api.TaskReport> taskReports) {
+    List<TaskReport> reports = new ArrayList<TaskReport>();
+    for (org.apache.hadoop.mapreduce.v2.api.TaskReport r : taskReports) {
+      reports.add(fromYarn(r));
+    }
+    return reports;
+  }
+}
+

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Mar 24 06:26:03 2011
@@ -31,11 +31,11 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.mapreduce.v2.api.Counters;
 import org.apache.hadoop.mapreduce.v2.api.JobID;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Thu Mar 24 06:26:03 2011
@@ -24,11 +24,11 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.Counters;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
 import org.apache.hadoop.mapreduce.v2.api.TaskID;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Mar 24 06:26:03 2011
@@ -21,9 +21,9 @@ package org.apache.hadoop.mapreduce.v2.h
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.ContainerID;
 import org.apache.hadoop.mapreduce.v2.api.Counters;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Mar 24 06:26:03 2011
@@ -27,9 +27,9 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.mapreduce.v2.api.JobID;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Thu Mar 24 06:26:03 2011
@@ -25,9 +25,10 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.JobID;
 import org.apache.hadoop.mapreduce.v2.api.JobState;
 import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
@@ -39,7 +40,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 24 06:26:03 2011
@@ -27,58 +27,75 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.ApplicationMaster;
 import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnRemoteException;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+
   private Configuration conf;
-  private ApplicationID appId;
+  private ApplicationID currentAppId;
   private final ResourceMgrDelegate rm;
   private MRClientProtocol realProxy = null;
   private String serviceAddr = "";
   private String serviceHttpAddr = "";
-  
-  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, 
-      ApplicationID appId) throws AvroRemoteException {
+
+  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm) {
     this.conf = conf;
     this.rm = rm;
-    this.appId = appId;
-    if (appId != null) {
+  }
+
+  private MRClientProtocol getProxy(JobID jobId) throws AvroRemoteException {
+    return getProxy(TypeConverter.toYarn(jobId).appID, false);
+  }
+
+  private MRClientProtocol getRefreshedProxy(JobID jobId) throws AvroRemoteException {
+    return getProxy(TypeConverter.toYarn(jobId).appID, true);
+  }
+
+  private MRClientProtocol getProxy(ApplicationID appId, 
+      boolean forceRefresh) throws AvroRemoteException {
+    if (currentAppId != appId || forceRefresh) {
+      currentAppId = appId;
       refreshProxy();
     }
+    return realProxy;
   }
 
   private void refreshProxy() throws AvroRemoteException {
-    ApplicationMaster appMaster = rm.getApplicationMaster(appId);
+    ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
     if (ApplicationState.COMPLETED.equals(appMaster.state)) {
-      String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
+      serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
           YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
-      LOG.debug("Reconnecting to job history server " + serviceAddr);
-    } else {
-      /* TODO check to confirm its really launched */
+      LOG.info("Application state is completed. " +
+            "Redirecting to job history server " + serviceAddr);
+      //TODO:
+      serviceHttpAddr = "";
+    } else if (ApplicationState.RUNNING.equals(appMaster.state)){
       serviceAddr = appMaster.host + ":" + appMaster.rpcPort;
       serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+    } else {
+      LOG.warn("Cannot connect to Application with state " + appMaster.state);
+      throw new YarnException(
+          "Cannot connect to Application with state " + appMaster.state);
     }
     try {
       instantiateProxy(serviceAddr);
@@ -87,31 +104,6 @@ public class ClientServiceDelegate {
     }
   }
 
-  void instantiateProxy(ApplicationID applicationId, ApplicationMaster appMaster)
-      throws IOException {
-    try {
-      this.appId = applicationId;
-      LOG.info("Trying to connect to the ApplicationManager of"
-          + " application " + applicationId + " running at " + appMaster);
-      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-      serviceAddr = appMaster.host + ":"
-          + appMaster.rpcPort;
-      serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
-      if (UserGroupInformation.isSecurityEnabled()) {
-        String clientTokenEncoded = appMaster.clientToken.toString();
-        Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>();
-        clientToken.decodeFromUrlString(clientTokenEncoded);
-        clientToken.setService(new Text(appMaster.host.toString() + ":"
-            + appMaster.rpcPort));
-        currentUser.addToken(clientToken);
-      }
-      instantiateProxy(serviceAddr);
-      LOG.info("Connection to the ApplicationManager established.");
-    } catch (IOException e) {
-      throw (new IOException(e));
-    }
-  }
-  
   private void instantiateProxy(final String serviceAddr) throws IOException {
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@@ -130,15 +122,20 @@ public class ClientServiceDelegate {
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
       InterruptedException {
-    appId = TypeConverter.toYarn(arg0).appID;
     org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
-    if (realProxy == null) refreshProxy();
     try {
-      return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+      return TypeConverter.fromYarn(getProxy(arg0).getCounters(jobID));
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch(Exception e) {
       LOG.debug("Failing to contact application master", e);
-      refreshProxy();
-      return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+      try {
+        return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(jobID));
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
   }
 
@@ -149,19 +146,23 @@ public class ClientServiceDelegate {
 
   public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
       int arg2) throws IOException, InterruptedException {
-    appId = TypeConverter.toYarn(arg0).appID;
-    if (realProxy == null) refreshProxy();
-    
     org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
     List<org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent> list = null;
     try {
-      list = realProxy.getTaskAttemptCompletionEvents(jobID,
+      list = getProxy(arg0).getTaskAttemptCompletionEvents(jobID,
           arg1, arg2);
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
-      refreshProxy();
-      list = realProxy.getTaskAttemptCompletionEvents(jobID,
-          arg1, arg2);
+      try {
+        list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(jobID,
+            arg1, arg2);
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
     return TypeConverter.fromYarn(
         list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0]));
@@ -174,15 +175,19 @@ public class ClientServiceDelegate {
     
     List<CharSequence> list = null;
     org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0);
-    appId = TypeConverter.toYarn(arg0.getJobID()).appID;
-    if (realProxy == null) refreshProxy();
-  
     try {
-      list = realProxy.getDiagnostics(attemptID);
+      list = getProxy(arg0.getJobID()).getDiagnostics(attemptID);
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
-      refreshProxy();
-      list = realProxy.getDiagnostics(attemptID);
+      try {
+        list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(attemptID);
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
     String[] result = new String[list.size()];
     int i = 0;
@@ -192,95 +197,70 @@ public class ClientServiceDelegate {
     return result;
   }
 
-  //this method is here due to package restriction of 
-  //TaskReport constructor
-  public static org.apache.hadoop.mapred.TaskReport[] fromYarn(
-      List<TaskReport> reports) {
-    org.apache.hadoop.mapred.TaskReport[] result = 
-      new org.apache.hadoop.mapred.TaskReport[reports.size()];
-    int i = 0;
-    for (TaskReport report : reports) {
-      List<CharSequence> diag = report.diagnostics;
-      String[] diagnosticArr = new String[diag.size()];
-      int j = 0;
-      for (CharSequence c : diag) {
-        diagnosticArr[j++] = c.toString();
-      }
-      org.apache.hadoop.mapred.TaskReport oldReport = 
-        new org.apache.hadoop.mapred.TaskReport(
-            TypeConverter.fromYarn(report.id), report.progress, 
-            report.state.toString(),
-            diagnosticArr, TypeConverter.fromYarn(report.state), 
-          report.startTime, report.finishTime,
-          new org.apache.hadoop.mapred.Counters(
-              TypeConverter.fromYarn(report.counters)));
-      result[i++] = oldReport;
-    }
-    return result;
-  }
-
- 
-  public JobReport getJobReport(org.apache.hadoop.mapreduce.v2.api.JobID jobID)
-      throws AvroRemoteException, YarnRemoteException {
-    appId = jobID.appID;
-    if (realProxy == null) refreshProxy();
-    
+  public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
+      AvroRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.JobID jobId = 
+      TypeConverter.toYarn(oldJobID);
+    LOG.debug("Getting Job status");
+    String stagingDir = conf.get("yarn.apps.stagingDir");
+    String jobFile = stagingDir + "/" + jobId.toString();
+    JobReport report = null;
     try {
-      return realProxy.getJobReport(jobID);
+      report = getProxy(oldJobID).getJobReport(jobId);
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch (Exception e) {
-      refreshProxy();
-      return realProxy.getJobReport(jobID);
+      try {
+        report = getRefreshedProxy(oldJobID).getJobReport(jobId);
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
-  }
-
-  public JobStatus getJobStatus(org.apache.hadoop.mapreduce.v2.api.JobID jobId) 
-      throws AvroRemoteException, YarnRemoteException {
-    appId = jobId.appID;
-    if (realProxy == null) refreshProxy();
-    String trackingUrl = serviceAddr;
-    String stagingDir = conf.get("yarn.apps.stagingDir");
-    String jobFile = stagingDir + "/" + jobId.toString();
-    return TypeConverter.fromYarn(getJobReport(jobId), jobFile, serviceHttpAddr);
-  }
-  
-
-  public JobStatus getJobStatus(JobID jobID) throws YarnRemoteException,
-      AvroRemoteException {
-    return getJobStatus(TypeConverter.toYarn(jobID));
+    return TypeConverter.fromYarn(report, jobFile, serviceHttpAddr);
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
       throws YarnRemoteException, AvroRemoteException {
       List<TaskReport> taskReports = null;
       org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
-      appId = nJobID.appID;
-      if (realProxy == null) refreshProxy();
-    
       try {
-        taskReports = realProxy.getTaskReports(nJobID, 
+        taskReports = getProxy(jobID).getTaskReports(nJobID, 
             TypeConverter.toYarn(taskType));
+      } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
       } catch(Exception e) {
         LOG.debug("Failed to contact application master ", e);
-        refreshProxy();
-        taskReports = realProxy.getTaskReports(nJobID, 
-            TypeConverter.toYarn(taskType));
+        try {
+          taskReports = getRefreshedProxy(jobID).getTaskReports(nJobID, 
+              TypeConverter.toYarn(taskType));
+        } catch(YarnRemoteException yre) {
+          LOG.warn(RPCUtil.toString(yre));
+          throw yre;
+        }
       }
-      return (org.apache.hadoop.mapreduce.TaskReport[])TypeConverter.fromYarn
-        (taskReports).toArray();
+      return TypeConverter.fromYarn
+        (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
   }
 
   public Void killJob(JobID jobID) throws YarnRemoteException,
       AvroRemoteException {
     org.apache.hadoop.mapreduce.v2.api.JobID  nJobID = TypeConverter.toYarn(jobID);
-    appId = nJobID.appID;
-    if (realProxy == null) refreshProxy();
-    
     try {
-      realProxy.killJob(nJobID);
+      getProxy(jobID).killJob(nJobID);
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
-      refreshProxy();
-      realProxy.killJob(nJobID);
+      try {
+        getRefreshedProxy(jobID).killJob(nJobID);
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
     return null;
   }
@@ -289,15 +269,19 @@ public class ClientServiceDelegate {
       throws YarnRemoteException, AvroRemoteException {
     org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID 
       = TypeConverter.toYarn(taskAttemptID);
-    appId = attemptID.taskID.jobID.appID;
-    if (realProxy == null) refreshProxy();
-    
     try {
-      realProxy.killTaskAttempt(attemptID);
+      getProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+    } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
+      LOG.warn(RPCUtil.toString(yre));
+      throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
-      refreshProxy();
-      realProxy.killTaskAttempt(attemptID);
+      try {
+        getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+      } catch(YarnRemoteException yre) {
+        LOG.warn(RPCUtil.toString(yre));
+        throw yre;
+      }
     }
     return true;
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Mar 24 06:26:03 2011
@@ -37,8 +37,8 @@ import org.apache.hadoop.mapreduce.JobSt
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.Token;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Mar 24 06:26:03 2011
@@ -94,26 +94,18 @@ public class YARNRunner implements Clien
    * yarn
    * @param conf the configuration object for the client
    */
-  public YARNRunner(Configuration conf, ApplicationID appID)
+  public YARNRunner(Configuration conf)
       throws AvroRemoteException {
     this.conf = new YarnConfiguration(conf);
     try {
       this.resMgrDelegate = new ResourceMgrDelegate(conf);
       this.clientServiceDelegate = new ClientServiceDelegate(conf,
-          resMgrDelegate, appID);
+          resMgrDelegate);
     } catch (UnsupportedFileSystemException ufe) {
       throw new RuntimeException("Error in instantiating YarnClient", ufe);
     }
   }
 
-  /**
-   * Yarn runner incapsulates the client interface of
-   * yarn
-   * @param conf the configuration object for the client
-   */
-  public YARNRunner(Configuration conf) throws AvroRemoteException {
-    this(conf, null);
-  }
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
       throws IOException, InterruptedException {
@@ -243,7 +235,6 @@ public class YARNRunner implements Clien
       ApplicationState.KILLED) {
       throw new AvroRemoteException("failed to run job");
     }
-    clientServiceDelegate.instantiateProxy(applicationId, appMaster);
     return clientServiceDelegate.getJobStatus(jobId);
   }
 
@@ -490,9 +481,6 @@ public class YARNRunner implements Clien
   public JobStatus getJobStatus(JobID jobID) throws IOException,
       InterruptedException {
     JobStatus status = clientServiceDelegate.getJobStatus(jobID);
-    if (status.isJobComplete()) {
-      // Clean up the Container running the ApplicationMaster.
-    }
     return status;
   }
   

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1084852&r1=1084851&r2=1084852&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 24 06:26:03 2011
@@ -64,25 +64,30 @@ public class TestClientRedirect {
   private static final Log LOG = LogFactory.getLog(TestClientRedirect.class);
   private static final String RMADDRESS = "0.0.0.0:8054";
   private static final String AMHOSTADDRESS = "0.0.0.0:10020";
-  private static final String AMHOSTNAME = "0.0.0.0";
-  private static final int AMPORT = 10020;
-  private boolean firstRedirect = false; 
-  private boolean secondRedirect = false;
+  private static final String HSHOSTADDRESS = "0.0.0.0:10021";
+  private static final int HSPORT = 10020;
+  private volatile boolean amContact = false; 
+  private volatile boolean hsContact = false;
+  private volatile boolean amRunning = false;
  
   @Test
   public void testRedirect() throws Exception {
     
     Configuration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
-    conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, AMHOSTADDRESS);
+    conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
     RMService rmService = new RMService("test");
     rmService.init(conf);
     rmService.start();
   
-    MRClientProtocolService clientService =
-      new MRClientProtocolService();
-    clientService.init(conf);
-    clientService.start(conf);
+    AMService amService = new AMService();
+    amService.init(conf);
+    amService.start(conf);
+    amRunning = true;
+
+    HistoryService historyService = new HistoryService();
+    historyService.init(conf);
+    historyService.start(conf);
   
     LOG.info("services started");
     YARNRunner yarnRunner = new YARNRunner(conf);
@@ -90,11 +95,17 @@ public class TestClientRedirect {
     org.apache.hadoop.mapreduce.JobID jobID =
       new org.apache.hadoop.mapred.JobID("201103121733", 1);
     yarnRunner.getJobCounters(jobID);
-    Assert.assertTrue(firstRedirect);
-    Assert.assertTrue(secondRedirect);
+    Assert.assertTrue(amContact);
+    
+    //bring down the AM service
+    amService.stop();
+    amRunning = false;
+    
+    yarnRunner.getJobCounters(jobID);
+    Assert.assertTrue(hsContact);
     
     rmService.stop();
-    clientService.stop();
+    historyService.stop();
   }
 
   class RMService extends AbstractService implements ClientRMProtocol {
@@ -143,13 +154,14 @@ public class TestClientRedirect {
       master.applicationId = applicationId;
       master.status = new ApplicationStatus();
       master.status.applicationId = applicationId;
-      if (firstRedirect == false) {
+      if (amRunning) {
         master.state = ApplicationState.RUNNING;
       } else {
         master.state = ApplicationState.COMPLETED;
       }
-      master.host = AMHOSTNAME;
-      master.rpcPort = AMPORT;
+      String[] split = AMHOSTADDRESS.split(":");
+      master.host = split[0];
+      master.rpcPort = Integer.parseInt(split[1]);
       return master;
   }
 
@@ -171,19 +183,39 @@ public class TestClientRedirect {
     }
   }
 
-  class MRClientProtocolService extends AbstractService 
+  class HistoryService extends AMService {
+    public HistoryService() {
+      super(HSHOSTADDRESS);
+    }
+
+    @Override
+    public Counters getCounters(JobID jobID) throws AvroRemoteException,
+      YarnRemoteException {
+      hsContact = true;
+      Counters counters = new Counters();
+      counters.groups = new HashMap<CharSequence, CounterGroup>();
+      return counters;
+   }
+  }
+
+  class AMService extends AbstractService 
       implements MRClientProtocol {
     private InetSocketAddress bindAddress;
     private Server server;
+    private final String hostAddress;
+    public AMService() {
+      this(AMHOSTADDRESS);
+    }
     
-    public MRClientProtocolService() {
+    public AMService(String hostAddress) {
       super("TestClientService");
+      this.hostAddress = hostAddress;
     }
 
     public void start(Configuration conf) {
       YarnRPC rpc = YarnRPC.create(conf);
       //TODO : use fixed port ??
-      InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTADDRESS);
+      InetSocketAddress address = NetUtils.createSocketAddr(hostAddress);
       InetAddress hostNameResolved = null;
       try {
         address.getAddress();
@@ -210,17 +242,11 @@ public class TestClientRedirect {
     @Override
     public Counters getCounters(JobID jobID) throws AvroRemoteException,
       YarnRemoteException {
-      if (firstRedirect == false) {
-        firstRedirect = true;
-        throw RPCUtil.getRemoteException(new IOException("Fail"));
-      }
-      else {
-        secondRedirect = true;
-        Counters counters = new Counters();
-        counters.groups = new HashMap<CharSequence, CounterGroup>();
-        return counters;
-      }
- }
+      amContact = true;
+      Counters counters = new Counters();
+      counters.groups = new HashMap<CharSequence, CounterGroup>();
+      return counters;
+   }
 
     @Override
     public List<CharSequence> getDiagnostics(



Mime
View raw message