hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1087462 [8/20] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/h...
Date Thu, 31 Mar 2011 22:23:34 GMT
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Mar 31 22:23:22 2011
@@ -27,12 +27,13 @@ import java.util.Map;
 
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 
 import static org.apache.hadoop.yarn.util.StringHelper.*;
 
@@ -44,60 +45,60 @@ public class MRApps extends Apps {
   public static final String TASK = "task";
   public static final String ATTEMPT = "attempt";
 
-  public static String toString(JobID jid) {
-    return _join(JOB, jid.appID.clusterTimeStamp, jid.appID.id, jid.id);
+  public static String toString(JobId jid) {
+    return _join(JOB, jid.getAppId().getClusterTimestamp(), jid.getAppId().getId(), jid.getId());
   }
 
-  public static JobID toJobID(String jid) {
+  public static JobId toJobID(String jid) {
     Iterator<String> it = _split(jid).iterator();
     return toJobID(JOB, jid, it);
   }
 
   // mostly useful for parsing task/attempt id like strings
-  public static JobID toJobID(String prefix, String s, Iterator<String> it) {
-    ApplicationID appID = toAppID(prefix, s, it);
+  public static JobId toJobID(String prefix, String s, Iterator<String> it) {
+    ApplicationId appId = toAppID(prefix, s, it);
     shouldHaveNext(prefix, s, it);
-    JobID jobID = new JobID();
-    jobID.appID = appID;
-    jobID.id = Integer.parseInt(it.next());
-    return jobID;
+    JobId jobId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
+    jobId.setAppId(appId);
+    jobId.setId(Integer.parseInt(it.next()));
+    return jobId;
   }
 
-  public static String toString(TaskID tid) {
-    return _join("task", tid.jobID.appID.clusterTimeStamp, tid.jobID.appID.id,
-                 tid.jobID.id, taskSymbol(tid.taskType), tid.id);
+  public static String toString(TaskId tid) {
+    return _join("task", tid.getJobId().getAppId().getClusterTimestamp(), tid.getJobId().getAppId().getId(),
+                 tid.getJobId().getId(), taskSymbol(tid.getTaskType()), tid.getId());
   }
 
-  public static TaskID toTaskID(String tid) {
+  public static TaskId toTaskID(String tid) {
     Iterator<String> it = _split(tid).iterator();
     return toTaskID(TASK, tid, it);
   }
 
-  public static TaskID toTaskID(String prefix, String s, Iterator<String> it) {
-    JobID jid = toJobID(prefix, s, it);
+  public static TaskId toTaskID(String prefix, String s, Iterator<String> it) {
+    JobId jid = toJobID(prefix, s, it);
     shouldHaveNext(prefix, s, it);
-    TaskID tid = new TaskID();
-    tid.jobID = jid;
-    tid.taskType = taskType(it.next());
+    TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
+    tid.setJobId(jid);
+    tid.setTaskType(taskType(it.next()));
     shouldHaveNext(prefix, s, it);
-    tid.id = Integer.parseInt(it.next());
+    tid.setId(Integer.parseInt(it.next()));
     return tid;
   }
 
-  public static String toString(TaskAttemptID taid) {
-    return _join("attempt", taid.taskID.jobID.appID.clusterTimeStamp,
-                 taid.taskID.jobID.appID.id, taid.taskID.jobID.id,
-                 taskSymbol(taid.taskID.taskType), taid.taskID.id, taid.id);
+  public static String toString(TaskAttemptId taid) {
+    return _join("attempt", taid.getTaskId().getJobId().getAppId().getClusterTimestamp(),
+                 taid.getTaskId().getJobId().getAppId().getId(), taid.getTaskId().getJobId().getId(),
+                 taskSymbol(taid.getTaskId().getTaskType()), taid.getTaskId().getId(), taid.getId());
   }
 
-  public static TaskAttemptID toTaskAttemptID(String taid) {
+  public static TaskAttemptId toTaskAttemptID(String taid) {
     Iterator<String> it = _split(taid).iterator();
-    TaskID tid = toTaskID(ATTEMPT, taid, it);
+    TaskId tid = toTaskID(ATTEMPT, taid, it);
     shouldHaveNext(ATTEMPT, taid, it);
-    TaskAttemptID taID = new TaskAttemptID();
-    taID.taskID = tid;
-    taID.id = Integer.parseInt(it.next());
-    return taID;
+    TaskAttemptId taId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
+    taId.setTaskId(tid);
+    taId.setId(Integer.parseInt(it.next()));
+    return taId;
   }
 
   public static String taskSymbol(TaskType type) {
@@ -116,7 +117,7 @@ public class MRApps extends Apps {
   }
 
   public static void setInitialClasspath(
-      Map<CharSequence, CharSequence> environment) throws IOException {
+      Map<String, String> environment) throws IOException {
 
     // Get yarn mapreduce-app classpath from generated classpath
     // Works if compile time env is same as runtime. For e.g. tests.
@@ -151,8 +152,8 @@ public class MRApps extends Apps {
   }
 
   public static void addToClassPath(
-      Map<CharSequence, CharSequence> environment, String fileName) {
-    CharSequence classpath = environment.get(CLASSPATH);
+      Map<String, String> environment, String fileName) {
+    String classpath = environment.get(CLASSPATH);
     if (classpath == null) {
       classpath = fileName;
     } else {

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRProtoUtils.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,82 @@
+package org.apache.hadoop.mapreduce.v2.util;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.PhaseProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptCompletionEventStatusProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskStateProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskTypeProto;
+
+public class MRProtoUtils {
+
+  /*
+   * JobState
+   */
+  private static String JOB_STATE_PREFIX = "J_";
+  public static JobStateProto convertToProtoFormat(JobState e) {
+    return JobStateProto.valueOf(JOB_STATE_PREFIX + e.name());
+  }
+  public static JobState convertFromProtoFormat(JobStateProto e) {
+    return JobState.valueOf(e.name().replace(JOB_STATE_PREFIX, ""));
+  }
+  
+  /*
+   * Phase
+   */
+  private static String PHASE_PREFIX = "P_";
+  public static PhaseProto convertToProtoFormat(Phase e) {
+    return PhaseProto.valueOf(PHASE_PREFIX + e.name());
+  }
+  public static Phase convertFromProtoFormat(PhaseProto e) {
+    return Phase.valueOf(e.name().replace(PHASE_PREFIX, ""));
+  }
+  
+  /*
+   * TaskAttemptCompletionEventStatus
+   */
+  private static String TACE_PREFIX = "TACE_";
+  public static TaskAttemptCompletionEventStatusProto convertToProtoFormat(TaskAttemptCompletionEventStatus e) {
+    return TaskAttemptCompletionEventStatusProto.valueOf(TACE_PREFIX + e.name());
+  }
+  public static TaskAttemptCompletionEventStatus convertFromProtoFormat(TaskAttemptCompletionEventStatusProto e) {
+    return TaskAttemptCompletionEventStatus.valueOf(e.name().replace(TACE_PREFIX, ""));
+  }
+  
+  /*
+   * TaskAttemptState
+   */
+  private static String TASK_ATTEMPT_STATE_PREFIX = "TA_";
+  public static TaskAttemptStateProto convertToProtoFormat(TaskAttemptState e) {
+    return TaskAttemptStateProto.valueOf(TASK_ATTEMPT_STATE_PREFIX + e.name());
+  }
+  public static TaskAttemptState convertFromProtoFormat(TaskAttemptStateProto e) {
+    return TaskAttemptState.valueOf(e.name().replace(TASK_ATTEMPT_STATE_PREFIX, ""));
+  }
+  
+  /*
+   * TaskState
+   */
+  private static String TASK_STATE_PREFIX = "TS_";
+  public static TaskStateProto convertToProtoFormat(TaskState e) {
+    return TaskStateProto.valueOf(TASK_STATE_PREFIX + e.name());
+  }
+  public static TaskState convertFromProtoFormat(TaskStateProto e) {
+    return TaskState.valueOf(e.name().replace(TASK_STATE_PREFIX, ""));
+  }
+  
+  /*
+   * TaskType
+   */
+  public static TaskTypeProto convertToProtoFormat(TaskType e) {
+    return TaskTypeProto.valueOf(e.name());
+  }
+  public static TaskType convertFromProtoFormat(TaskTypeProto e) {
+    return TaskType.valueOf(e.name());
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/MRClientProtocol.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,20 @@
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "MRClientProtocol";
+option java_generic_services = true;
+
+import "mr_service_protos.proto";
+
+service MRClientProtocolService {
+  rpc getJobReport (GetJobReportRequestProto) returns (GetJobReportResponseProto);
+  rpc getTaskReport (GetTaskReportRequestProto) returns (GetTaskReportResponseProto);
+  rpc getTaskAttemptReport (GetTaskAttemptReportRequestProto) returns (GetTaskAttemptReportResponseProto);
+  rpc getCounters (GetCountersRequestProto) returns (GetCountersResponseProto);
+  rpc getTaskAttemptCompletionEvents (GetTaskAttemptCompletionEventsRequestProto) returns (GetTaskAttemptCompletionEventsResponseProto);
+  rpc getTaskReports (GetTaskReportsRequestProto) returns (GetTaskReportsResponseProto);
+  rpc getDiagnostics (GetDiagnosticsRequestProto) returns (GetDiagnosticsResponseProto);
+
+  rpc killJob (KillJobRequestProto) returns (KillJobResponseProto);
+  rpc killTask (KillTaskRequestProto) returns (KillTaskResponseProto);
+  rpc killTaskAttempt (KillTaskAttemptRequestProto) returns (KillTaskAttemptResponseProto);
+  rpc failTaskAttempt (FailTaskAttemptRequestProto) returns (FailTaskAttemptResponseProto);
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,149 @@
+option java_package = "org.apache.hadoop.mapreduce.v2.proto";
+option java_outer_classname = "MRProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+
+enum TaskTypeProto {
+  MAP = 1;
+  REDUCE = 2;
+}
+
+message JobIdProto {
+  optional ApplicationIdProto app_id = 1;
+  optional int32 id = 2;
+}
+
+message TaskIdProto {
+  optional JobIdProto job_id = 1;
+  optional TaskTypeProto task_type = 2;
+  optional int32 id = 3;
+}
+
+message TaskAttemptIdProto {
+  optional TaskIdProto task_id = 1;
+  optional int32 id = 2;
+}
+
+enum TaskStateProto {
+  TS_NEW = 1;
+  TS_SCHEDULED = 2;
+  TS_RUNNING = 3;
+  TS_SUCCEEDED = 4;
+  TS_FAILED = 5;
+  TS_KILL_WAIT = 6;
+  TS_KILLED = 7;
+}
+
+enum PhaseProto {
+  P_STARTING = 1;
+  P_MAP = 2;
+  P_SHUFFLE = 3;
+  P_SORT = 4;
+  P_REDUCE = 5;
+  P_CLEANUP = 6;
+}
+
+message CounterProto {
+  optional string name = 1;
+  optional string display_name = 2;
+  optional int64 value = 3;
+}
+
+message CounterGroupProto {
+  optional string name = 1;
+  optional string display_name = 2;
+  repeated StringCounterMapProto counters = 3;
+}
+
+message CountersProto {
+  repeated StringCounterGroupMapProto counter_groups = 1;
+}
+
+message TaskReportProto {
+  optional TaskIdProto task_id = 1;
+  optional TaskStateProto task_state = 2;
+  optional float progress = 3;
+  optional int64 start_time = 4;
+  optional int64 finish_time = 5;
+  optional CountersProto counters = 6;
+  repeated TaskAttemptIdProto running_attempts = 7;
+  optional TaskAttemptIdProto successful_attempt = 8;
+  repeated string diagnostics = 9;
+}
+
+enum TaskAttemptStateProto {
+  TA_NEW = 1;
+  TA_UNASSIGNED = 2;
+  TA_ASSIGNED = 3;
+  TA_RUNNING = 4;
+  TA_COMMIT_PENDING = 5;
+  TA_SUCCESS_CONTAINER_CLEANUP = 6;
+  TA_SUCCEEDED = 7;
+  TA_FAIL_CONTAINER_CLEANUP = 8;
+  TA_FAIL_TASK_CLEANUP = 9;
+  TA_FAILED = 10;
+  TA_KILL_CONTAINER_CLEANUP = 11;
+  TA_KILL_TASK_CLEANUP = 12;
+  TA_KILLED = 13;
+}
+
+message TaskAttemptReportProto {
+  optional TaskAttemptIdProto task_attempt_id = 1;
+  optional TaskAttemptStateProto task_attempt_state = 2;
+  optional float progress = 3;
+  optional int64 start_time = 4;
+  optional int64 finish_time = 5;
+  optional CountersProto counters = 6;
+  optional string diagnostic_info = 7;
+  optional string state_string = 8;
+  optional PhaseProto phase = 9;
+}
+
+enum JobStateProto {
+  J_NEW = 1;
+  J_RUNNING = 2;
+  J_SUCCEEDED = 3;
+  J_FAILED = 4;
+  J_KILL_WAIT = 5;
+  J_KILLED = 6;
+  J_ERROR = 7;
+}
+
+message JobReportProto {
+  optional JobIdProto job_id = 1;
+  optional JobStateProto job_state = 2;
+  optional float map_progress = 3;
+  optional float reduce_progress = 4;
+  optional float cleanup_progress = 5;
+  optional float setup_progress = 6;
+  optional int64 start_time = 7;
+  optional int64 finish_time = 8;
+}
+
+enum TaskAttemptCompletionEventStatusProto {
+  TACE_FAILED = 1;
+  TACE_KILLED = 2;
+  TACE_SUCCEEDED = 3;
+  TACE_OBSOLETE = 4;
+  TACE_TIPFAILED = 5;
+}
+
+message TaskAttemptCompletionEventProto {
+  optional TaskAttemptIdProto attempt_id = 1;
+  optional TaskAttemptCompletionEventStatusProto status = 2;
+  optional string map_output_server_address = 3;
+  optional int32 attempt_run_time = 4;
+  optional int32 event_id = 5;
+}
+
+message StringCounterMapProto {
+  optional string key = 1;
+  optional CounterProto value = 2;
+}
+
+message StringCounterGroupMapProto {
+  optional string key = 1;
+  optional CounterGroupProto value = 2;
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_service_protos.proto Thu Mar 31 22:23:22 2011
@@ -0,0 +1,83 @@
+option java_package = "org.apache.hadoop.mapreduce.v2.proto";
+option java_outer_classname = "MRServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "mr_protos.proto";
+
+message GetJobReportRequestProto {
+  optional JobIdProto job_id = 1;
+}
+message GetJobReportResponseProto {
+  optional JobReportProto job_report = 1;
+}
+
+message GetTaskReportRequestProto {
+  optional TaskIdProto task_id = 1;
+}
+message GetTaskReportResponseProto {
+  optional TaskReportProto task_report = 1;
+}
+
+message GetTaskAttemptReportRequestProto {
+  optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message GetTaskAttemptReportResponseProto {
+  optional TaskAttemptReportProto task_attempt_report = 1;
+}
+
+message GetCountersRequestProto {
+  optional JobIdProto job_id = 1;
+}
+message GetCountersResponseProto {
+  optional CountersProto counters = 1;
+}
+
+message GetTaskAttemptCompletionEventsRequestProto {
+  optional JobIdProto job_id = 1;
+  optional int32 from_event_id = 2;
+  optional int32 max_events = 3;
+}
+message GetTaskAttemptCompletionEventsResponseProto {
+  repeated TaskAttemptCompletionEventProto completion_events = 1;
+}
+
+message GetTaskReportsRequestProto {
+  optional JobIdProto job_id = 1;
+  optional TaskTypeProto task_type = 2;
+}
+message GetTaskReportsResponseProto {
+  repeated TaskReportProto task_reports = 1;
+}
+
+message GetDiagnosticsRequestProto {
+  optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message GetDiagnosticsResponseProto {
+  repeated string diagnostics = 1;
+}
+
+
+message KillJobRequestProto {
+  optional JobIdProto job_id = 1;
+}
+message KillJobResponseProto {
+}
+
+message KillTaskRequestProto {
+  optional TaskIdProto task_id = 1;
+}
+message KillTaskResponseProto {
+}
+
+message KillTaskAttemptRequestProto {
+  optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message KillTaskAttemptResponseProto {
+}
+
+message FailTaskAttemptRequestProto {
+  optional TaskAttemptIdProto task_attempt_id = 1;
+}
+message FailTaskAttemptResponseProto {
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,180 @@
+package org.apache.hadoop.mapreduce.v2;
+
+
+import java.net.InetSocketAddress;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
+import org.junit.Test;
+
+public class TestRPCFactories {
+  
+  
+  
+  @Test
+  public void test() {
+    testPbServerFactory();
+    
+    testPbClientFactory();
+  }
+  
+  
+  
+  private void testPbServerFactory() {
+    InetSocketAddress addr = new InetSocketAddress(0);
+    Configuration conf = new Configuration();
+    MRClientProtocol instance = new MRClientProtocolTestImpl();
+    Server server = null;
+    try {
+      server = RpcServerFactoryPBImpl.get().getServer(MRClientProtocol.class, instance, addr, conf, null);
+      server.start();
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete server");
+    } finally {
+      server.stop();
+    }
+  }
+
+  
+  private void testPbClientFactory() {
+    InetSocketAddress addr = new InetSocketAddress(0);
+    System.err.println(addr.getHostName() + addr.getPort());
+    Configuration conf = new Configuration();
+    MRClientProtocol instance = new MRClientProtocolTestImpl();
+    Server server = null;
+    try {
+      server = RpcServerFactoryPBImpl.get().getServer(MRClientProtocol.class, instance, addr, conf, null);
+      server.start();
+      System.err.println(server.getListenerAddress());
+      System.err.println(NetUtils.getConnectAddress(server));
+
+      MRClientProtocol client = null;
+      try {
+        client = (MRClientProtocol) RpcClientFactoryPBImpl.get().getClient(MRClientProtocol.class, 1, NetUtils.getConnectAddress(server), conf);
+      } catch (YarnException e) {
+        e.printStackTrace();
+        Assert.fail("Failed to crete client");
+      }
+      
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete server");
+    } finally {
+      server.stop();
+    }     
+  }
+
+  
+  public class MRClientProtocolTestImpl implements MRClientProtocol {
+
+    @Override
+    public GetJobReportResponse getJobReport(GetJobReportRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetTaskAttemptReportResponse getTaskAttemptReport(
+        GetTaskAttemptReportRequest request) throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetCountersResponse getCounters(GetCountersRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+        GetTaskAttemptCompletionEventsRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public KillJobResponse killJob(KillJobRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public KillTaskResponse killTask(KillTaskRequest request)
+        throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public KillTaskAttemptResponse killTaskAttempt(
+        KillTaskAttemptRequest request) throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public FailTaskAttemptResponse failTaskAttempt(
+        FailTaskAttemptRequest request) throws YarnRemoteException {
+      // TODO Auto-generated method stub
+      return null;
+    }
+    
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java?rev=1087462&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java Thu Mar 31 22:23:22 2011
@@ -0,0 +1,37 @@
+package org.apache.hadoop.mapreduce.v2;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.CounterGroupPBImpl;
+import org.junit.Test;
+
+public class TestRecordFactory {
+  
+  @Test
+  public void testPbRecordFactory() {
+    RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
+    
+    try {
+      CounterGroup response = pbRecordFactory.newRecordInstance(CounterGroup.class);
+      Assert.assertEquals(CounterGroupPBImpl.class, response.getClass());
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete record");
+    }
+    
+    try {
+      GetCountersRequest response = pbRecordFactory.newRecordInstance(GetCountersRequest.class);
+      Assert.assertEquals(GetCountersRequestPBImpl.class, response.getClass());
+    } catch (YarnException e) {
+      e.printStackTrace();
+      Assert.fail("Failed to crete record");
+    }
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar 31 22:23:22 2011
@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.mapreduce.v2.util;
 
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -32,42 +33,48 @@ import static org.junit.Assert.*;
 public class TestMRApps {
 
   @Test public void testJobIDtoString() {
-    JobID jid = new JobID();
-    jid.appID = new ApplicationID();
+    JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
+    jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
     assertEquals("job_0_0_0", MRApps.toString(jid));
   }
 
   @Test public void testToJobID() {
-    JobID jid = MRApps.toJobID("job_1_1_1");
-    assertEquals(1, jid.appID.clusterTimeStamp);
-    assertEquals(1, jid.appID.id);
-    assertEquals(1, jid.id);
+    JobId jid = MRApps.toJobID("job_1_1_1");
+    assertEquals(1, jid.getAppId().getClusterTimestamp());
+    assertEquals(1, jid.getAppId().getId());
+    assertEquals(1, jid.getId());
   }
 
   @Test(expected=YarnException.class) public void testJobIDShort() {
     MRApps.toJobID("job_0_0");
   }
 
+  //TODO_get.set
   @Test public void testTaskIDtoString() {
-    TaskID tid = new TaskID();
-    tid.jobID = new JobID();
-    tid.jobID.appID = new ApplicationID();
-    tid.taskType = TaskType.MAP;
+    TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
+    tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
+    tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+    tid.setTaskType(TaskType.MAP);
+    TaskType type = tid.getTaskType();
+    System.err.println(type);
+    type = TaskType.REDUCE;
+    System.err.println(type);
+    System.err.println(tid.getTaskType());
     assertEquals("task_0_0_0_m_0", MRApps.toString(tid));
-    tid.taskType = TaskType.REDUCE;
+    tid.setTaskType(TaskType.REDUCE);
     assertEquals("task_0_0_0_r_0", MRApps.toString(tid));
   }
 
   @Test public void testToTaskID() {
-    TaskID tid = MRApps.toTaskID("task_1_2_3_r_4");
-    assertEquals(1, tid.jobID.appID.clusterTimeStamp);
-    assertEquals(2, tid.jobID.appID.id);
-    assertEquals(3, tid.jobID.id);
-    assertEquals(TaskType.REDUCE, tid.taskType);
-    assertEquals(4, tid.id);
+    TaskId tid = MRApps.toTaskID("task_1_2_3_r_4");
+    assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
+    assertEquals(2, tid.getJobId().getAppId().getId());
+    assertEquals(3, tid.getJobId().getId());
+    assertEquals(TaskType.REDUCE, tid.getTaskType());
+    assertEquals(4, tid.getId());
 
     tid = MRApps.toTaskID("task_1_2_3_m_4");
-    assertEquals(TaskType.MAP, tid.taskType);
+    assertEquals(TaskType.MAP, tid.getTaskType());
   }
 
   @Test(expected=YarnException.class) public void testTaskIDShort() {
@@ -78,22 +85,23 @@ public class TestMRApps {
     MRApps.toTaskID("task_0_0_0_x_0");
   }
 
+  //TODO_get.set
   @Test public void testTaskAttemptIDtoString() {
-    TaskAttemptID taid = new TaskAttemptID();
-    taid.taskID = new TaskID();
-    taid.taskID.taskType = TaskType.MAP;
-    taid.taskID.jobID = new JobID();
-    taid.taskID.jobID.appID = new ApplicationID();
+    TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
+    taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
+    taid.getTaskId().setTaskType(TaskType.MAP);
+    taid.getTaskId().setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
+    taid.getTaskId().getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
     assertEquals("attempt_0_0_0_m_0_0", MRApps.toString(taid));
   }
 
   @Test public void testToTaskAttemptID() {
-    TaskAttemptID taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
-    assertEquals(0, taid.taskID.jobID.appID.clusterTimeStamp);
-    assertEquals(1, taid.taskID.jobID.appID.id);
-    assertEquals(2, taid.taskID.jobID.id);
-    assertEquals(3, taid.taskID.id);
-    assertEquals(4, taid.id);
+    TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
+    assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
+    assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
+    assertEquals(2, taid.getTaskId().getJobId().getId());
+    assertEquals(3, taid.getTaskId().getId());
+    assertEquals(4, taid.getId());
   }
 
   @Test(expected=YarnException.class) public void testTaskAttemptIDShort() {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Mar 31 22:23:22 2011
@@ -36,16 +36,18 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
 
 /**
  * Loads the basic job level data upfront.
@@ -56,20 +58,20 @@ public class CompletedJob implements org
   static final Log LOG = LogFactory.getLog(CompletedJob.class);
   private final Counters counters;
   private final Configuration conf;
-  private final JobID jobID;
+  private final JobId jobId;
   private final List<String> diagnostics = new ArrayList<String>();
   private final JobReport report;
-  private final Map<TaskID, Task> tasks = new HashMap<TaskID, Task>();
-  private final Map<TaskID, Task> mapTasks = new HashMap<TaskID, Task>();
-  private final Map<TaskID, Task> reduceTasks = new HashMap<TaskID, Task>();
+  private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
+  private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
+  private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
   
   private TaskAttemptCompletionEvent[] completionEvents;
   private JobInfo jobInfo;
 
 
-  public CompletedJob(Configuration conf, JobID jobID) throws IOException {
+  public CompletedJob(Configuration conf, JobId jobId) throws IOException {
     this.conf = conf;
-    this.jobID = jobID;
+    this.jobId = jobId;
     //TODO fix
     /*
     String  doneLocation =
@@ -97,11 +99,11 @@ public class CompletedJob implements org
 
     counters = TypeConverter.toYarn(jobInfo.getTotalCounters());
     diagnostics.add(jobInfo.getErrorInfo());
-    report = new JobReport();
-    report.id = jobID;
-    report.state = JobState.valueOf(jobInfo.getJobStatus());
-    report.startTime = jobInfo.getLaunchTime();
-    report.finishTime = jobInfo.getFinishTime();
+    report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
+    report.setJobId(jobId);
+    report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
+    report.setStartTime(jobInfo.getLaunchTime());
+    report.setFinishTime(jobInfo.getFinishTime());
   }
 
   @Override
@@ -120,8 +122,8 @@ public class CompletedJob implements org
   }
 
   @Override
-  public JobID getID() {
-    return jobID;
+  public JobId getID() {
+    return jobId;
   }
 
   @Override
@@ -131,12 +133,12 @@ public class CompletedJob implements org
 
   @Override
   public JobState getState() {
-    return report.state;
+    return report.getJobState();
   }
 
   @Override
-  public Task getTask(TaskID taskID) {
-    return tasks.get(taskID);
+  public Task getTask(TaskId taskId) {
+    return tasks.get(taskId);
   }
 
   @Override
@@ -146,7 +148,7 @@ public class CompletedJob implements org
   }
 
   @Override
-  public Map<TaskID, Task> getTasks() {
+  public Map<TaskId, Task> getTasks() {
     return tasks;
   }
 
@@ -159,7 +161,7 @@ public class CompletedJob implements org
     if (user == null) {
       LOG.error("user null is not allowed");
     }
-    String jobName = TypeConverter.fromYarn(jobID).toString();
+    String jobName = TypeConverter.fromYarn(jobId).toString();
     String defaultDoneDir = conf.get(
         YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
     String  jobhistoryDir =
@@ -186,7 +188,7 @@ public class CompletedJob implements org
     // populate the tasks
     for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
         .getAllTasks().entrySet()) {
-      TaskID yarnTaskID = TypeConverter.toYarn(entry.getKey());
+      TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
       TaskInfo taskInfo = entry.getValue();
       Task task = new CompletedTask(yarnTaskID, taskInfo);
       tasks.put(yarnTaskID, task);
@@ -207,7 +209,7 @@ public class CompletedJob implements org
   }
 
   @Override
-  public CharSequence getName() {
+  public String getName() {
     return jobInfo.getJobname();
   }
 
@@ -222,7 +224,7 @@ public class CompletedJob implements org
   }
 
   @Override
-  public Map<TaskID, Task> getTasks(TaskType taskType) {
+  public Map<TaskId, Task> getTasks(TaskType taskType) {
     if (TaskType.MAP.equals(taskType)) {
       return mapTasks;
     } else {//we have only two type of tasks

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java Thu Mar 31 22:23:22 2011
@@ -27,14 +27,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 public class CompletedTask implements Task {
 
@@ -44,15 +45,15 @@ public class CompletedTask implements Ta
   private final long startTime;
   private final long finishTime;
   private final TaskState state;
-  private final TaskID taskID;
+  private final TaskId taskId;
   private final TaskReport report;
-  private final Map<TaskAttemptID, TaskAttempt> attempts =
-    new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+  private final Map<TaskAttemptId, TaskAttempt> attempts =
+    new LinkedHashMap<TaskAttemptId, TaskAttempt>();
   
   private static final Log LOG = LogFactory.getLog(CompletedTask.class);
 
-  CompletedTask(TaskID taskID, TaskInfo taskinfo) {
-    this.taskID = taskID;
+  CompletedTask(TaskId taskId, TaskInfo taskinfo) {
+    this.taskId = taskId;
     this.startTime = taskinfo.getStartTime();
     this.finishTime = taskinfo.getFinishTime();
     this.type = TypeConverter.toYarn(taskinfo.getTaskType());
@@ -61,35 +62,34 @@ public class CompletedTask implements Ta
     this.state = TaskState.valueOf(taskinfo.getTaskStatus());
     for (TaskAttemptInfo attemptHistory : 
                 taskinfo.getAllTaskAttempts().values()) {
-      CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskID, 
+      CompletedTaskAttempt attempt = new CompletedTaskAttempt(taskId, 
           attemptHistory);
       attempts.put(attempt.getID(), attempt);
     }
     
-    report = new TaskReport();
-    report.id = taskID;
-    report.startTime = startTime;
-    report.finishTime = finishTime;
-    report.state = state;
-    report.progress = getProgress();
-    report.counters = getCounters();
-    report.runningAttempts = new ArrayList<TaskAttemptID>();
-    report.runningAttempts.addAll(attempts.keySet());
+    report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskReport.class);
+    report.setTaskId(taskId);
+    report.setStartTime(startTime);
+    report.setFinishTime(finishTime);
+    report.setTaskState(state);
+    report.setProgress(getProgress());
+    report.setCounters(getCounters());
+    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>(attempts.keySet()));
     //report.successfulAttempt = taskHistory.; //TODO
   }
 
   @Override
-  public boolean canCommit(TaskAttemptID taskAttemptID) {
+  public boolean canCommit(TaskAttemptId taskAttemptID) {
     return false;
   }
 
   @Override
-  public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+  public TaskAttempt getAttempt(TaskAttemptId attemptID) {
     return attempts.get(attemptID);
   }
 
   @Override
-  public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+  public Map<TaskAttemptId, TaskAttempt> getAttempts() {
     return attempts;
   }
 
@@ -99,8 +99,8 @@ public class CompletedTask implements Ta
   }
 
   @Override
-  public TaskID getID() {
-    return taskID;
+  public TaskId getID() {
+    return taskId;
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Mar 31 22:23:22 2011
@@ -23,24 +23,25 @@ import java.util.List;
 
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 public class CompletedTaskAttempt implements TaskAttempt {
 
   private final TaskAttemptInfo attemptInfo;
-  private final TaskAttemptID attemptId;
+  private final TaskAttemptId attemptId;
   private final Counters counters;
   private final TaskAttemptState state;
   private final TaskAttemptReport report;
-  private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+  private final List<String> diagnostics = new ArrayList<String>();
 
-  CompletedTaskAttempt(TaskID taskID, TaskAttemptInfo attemptInfo) {
+  CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
     this.attemptInfo = attemptInfo;
     this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
     this.counters = TypeConverter.toYarn(
@@ -51,20 +52,21 @@ public class CompletedTaskAttempt implem
       diagnostics.add(attemptInfo.getError());
     }
     
-    report = new TaskAttemptReport();
-    report.id = attemptId;
-    report.state = state;
-    report.progress = getProgress();
-    report.startTime = attemptInfo.getStartTime();
-    report.finishTime = attemptInfo.getFinishTime();
-    report.diagnosticInfo = attemptInfo.getError();
+    report = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptReport.class);
+    report.setTaskAttemptId(attemptId);
+    report.setTaskAttemptState(state);
+    report.setProgress(getProgress());
+    report.setStartTime(attemptInfo.getStartTime());
+    
+    report.setFinishTime(attemptInfo.getFinishTime());
+    report.setDiagnosticInfo(attemptInfo.getError());
     //result.phase = attemptInfo.get;//TODO
-    report.stateString = state.toString();
-    report.counters = getCounters();
+    report.setStateString(state.toString());
+    report.setCounters(getCounters());
   }
 
   @Override
-  public ContainerID getAssignedContainerID() {
+  public ContainerId getAssignedContainerID() {
     // TODO Auto-generated method stub
     return null;
   }
@@ -80,7 +82,7 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
-  public TaskAttemptID getID() {
+  public TaskAttemptId getID() {
     return attemptId;
   }
 
@@ -105,17 +107,17 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
-  public List<CharSequence> getDiagnostics() {
+  public List<String> getDiagnostics() {
     return diagnostics;
   }
 
   @Override
   public long getLaunchTime() {
-    return report.startTime;
+    return report.getStartTime();
   }
 
   @Override
   public long getFinishTime() {
-    return report.finishTime;
+    return report.getFinishTime();
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Thu Mar 31 22:23:22 2011
@@ -18,38 +18,54 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 
-import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.webapp.WebApp;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
 
 /**
@@ -110,92 +126,115 @@ public class HistoryClientService extend
 
   private class MRClientProtocolHandler implements MRClientProtocol {
 
-    private Job getJob(JobID jobID) throws AvroRemoteException {
-      Job job = history.getJob(jobID);
+    private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    
+    private Job getJob(JobId jobId) throws YarnRemoteException {
+      Job job = history.getJob(jobId);
       if (job == null) {
-        throw RPCUtil.getRemoteException("Unknown job " + jobID);
+        throw RPCUtil.getRemoteException("Unknown job " + jobId);
       }
       return job;
     }
 
     @Override
-    public Counters getCounters(JobID jobID) throws AvroRemoteException {
-      Job job = getJob(jobID);
-      return job.getCounters();
+    public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      Job job = getJob(jobId);
+      GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
+      response.setCounters(job.getCounters());
+      return response;
     }
-
+    
     @Override
-    public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
-      Job job = getJob(jobID);
-      return job.getReport();
+    public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      Job job = getJob(jobId);
+      GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
+      response.setJobReport(job.getReport());
+      return response;
     }
 
     @Override
-    public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
-        throws AvroRemoteException {
-      Job job = getJob(taskAttemptID.taskID.jobID);
-      return job.getTask(taskAttemptID.taskID).
-          getAttempt(taskAttemptID).getReport();
+    public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      Job job = getJob(taskAttemptId.getTaskId().getJobId());
+      GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
+      response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
+      return response;
     }
 
     @Override
-    public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
-      Job job = getJob(taskID.jobID);
-      return job.getTask(taskID).getReport();
+    public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
+      TaskId taskId = request.getTaskId();
+      Job job = getJob(taskId.getJobId());
+      GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
+      response.setTaskReport(job.getTask(taskId).getReport());
+      return response;
     }
 
     @Override
-    public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(
-        JobID jobID, 
-        int fromEventId, int maxEvents) throws AvroRemoteException {
-      Job job = getJob(jobID);
-      return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, 
-          maxEvents));
+    public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      int fromEventId = request.getFromEventId();
+      int maxEvents = request.getMaxEvents();
+      
+      Job job = getJob(jobId);
+      
+      GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+      response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
+      return response;
     }
-
+      
     @Override
-    public Void killJob(JobID jobID) throws AvroRemoteException {
+    public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
-
+    
     @Override
-    public Void killTask(TaskID taskID) throws AvroRemoteException {
-      getJob(taskID.jobID);
+    public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
+      TaskId taskId = request.getTaskId();
+      getJob(taskId.getJobId());
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
-
+    
     @Override
-    public Void killTaskAttempt(TaskAttemptID taskAttemptID)
-        throws AvroRemoteException {
-      getJob(taskAttemptID.taskID.jobID);
+    public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      getJob(taskAttemptId.getTaskId().getJobId());
       throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
 
     @Override
-    public Void failTaskAttempt(TaskAttemptID taskAttemptID)
-        throws AvroRemoteException {
-      getJob(taskAttemptID.taskID.jobID);
-      throw RPCUtil.getRemoteException("Invalid operation on completed job");
+    public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+    
+      Job job = getJob(taskAttemptId.getTaskId().getJobId());
+      
+      GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+      response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
+      return response;
     }
 
-    @Override
-    public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
-        throws AvroRemoteException {
-      Job job = getJob(taskAttemptID.taskID.jobID);
-      return job.getTask(taskAttemptID.taskID).
-                 getAttempt(taskAttemptID).getDiagnostics();
+    @Override 
+    public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
+      TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+      getJob(taskAttemptId.getTaskId().getJobId());
+      throw RPCUtil.getRemoteException("Invalid operation on completed job");
     }
 
     @Override
-    public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
-        throws AvroRemoteException {
-      Job job = getJob(jobID);
-      List<TaskReport> reports = new ArrayList<TaskReport>();
+    public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException {
+      JobId jobId = request.getJobId();
+      TaskType taskType = request.getTaskType();
+      
+      GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+      Job job = getJob(jobId);
       Collection<Task> tasks = job.getTasks(taskType).values();
       for (Task task : tasks) {
-        reports.add(task.getReport());
+        response.addTaskReport(task.getReport());
       }
-      return reports;
+      return response;
     }
 
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java Thu Mar 31 22:23:22 2011
@@ -21,13 +21,13 @@ package org.apache.hadoop.mapreduce.v2.h
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 
 public interface HistoryContext {
 
-  Job getJob(JobID id);
+  Job getJob(JobId id);
 
-  Map<JobID, Job> getAllJobs(ApplicationID appID);
+  Map<JobId, Job> getAllJobs(ApplicationId appID);
 
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Thu Mar 31 22:23:22 2011
@@ -27,20 +27,20 @@ import java.util.concurrent.ConcurrentHa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.hs.CompletedJob;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 /*
  * Loads and manages the Job history cache.
  */
 public class JobHistory implements HistoryContext {
 
-  private Map<JobID, Job> completedJobCache =
-    new ConcurrentHashMap<JobID, Job>();
+  private Map<JobId, Job> completedJobCache =
+    new ConcurrentHashMap<JobId, Job>();
   private Configuration conf;
   private final LinkedList<Job> jobQ = new LinkedList<Job>();
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
@@ -51,16 +51,16 @@ public class JobHistory implements Histo
     this.conf = conf;
   }
   @Override
-  public synchronized Job getJob(JobID jobID) {
-    Job job = completedJobCache.get(jobID);
+  public synchronized Job getJob(JobId jobId) {
+    Job job = completedJobCache.get(jobId);
     if (job == null) {
       try {
-        job = new CompletedJob(conf, jobID);
+        job = new CompletedJob(conf, jobId);
       } catch (IOException e) {
         LOG.warn("HistoryContext getJob failed " + e);
         throw new YarnException(e);
       }
-      completedJobCache.put(jobID, job);
+      completedJobCache.put(jobId, job);
       jobQ.add(job);
       if (jobQ.size() > retiredJobsCacheSize) {
          Job removed = jobQ.remove();
@@ -71,11 +71,11 @@ public class JobHistory implements Histo
   }
 
   @Override
-  public Map<JobID, Job> getAllJobs(ApplicationID appID) {
+  public Map<JobId, Job> getAllJobs(ApplicationId appID) {
     //currently there is 1 to 1 mapping between app and job id
     org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
-    Map<JobID, Job> jobs = new HashMap<JobID, Job>();
-    JobID jobID = TypeConverter.toYarn(oldJobID);
+    Map<JobId, Job> jobs = new HashMap<JobId, Job>();
+    JobId jobID = TypeConverter.toYarn(oldJobID);
     jobs.put(jobID, getJob(jobID));
     return jobs;
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Thu Mar 31 22:23:22 2011
@@ -25,17 +25,19 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -53,7 +55,7 @@ public class TestJobHistoryEvents {
     MRApp app = new MRApp(2, 1, true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
-    JobID jobId = job.getID();
+    JobId jobId = job.getID();
     LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
     app.waitForState(job, JobState.SUCCEEDED);
     /*
@@ -66,16 +68,16 @@ public class TestJobHistoryEvents {
         parsedJob.getCompletedMaps());
     
     
-    Map<TaskID, Task> tasks = parsedJob.getTasks();
+    Map<TaskId, Task> tasks = parsedJob.getTasks();
     Assert.assertEquals("No of tasks not correct", 3, tasks.size());
     for (Task task : tasks.values()) {
       verifyTask(task);
     }
     
-    Map<TaskID, Task> maps = parsedJob.getTasks(TaskType.MAP);
+    Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
     Assert.assertEquals("No of maps not correct", 2, maps.size());
     
-    Map<TaskID, Task> reduces = parsedJob.getTasks(TaskType.REDUCE);
+    Map<TaskId, Task> reduces = parsedJob.getTasks(TaskType.REDUCE);
     Assert.assertEquals("No of reduces not correct", 1, reduces.size());
     
     
@@ -89,7 +91,7 @@ public class TestJobHistoryEvents {
   private void verifyTask(Task task) {
     Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
         task.getState());
-    Map<TaskAttemptID, TaskAttempt> attempts = task.getAttempts();
+    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
     Assert.assertEquals("No of attempts not correct", 1, attempts.size());
     for (TaskAttempt attempt : attempts.values()) {
       verifyAttempt(attempt);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 31 22:23:22 2011
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.util.List;
 
-import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,18 +33,27 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.yarn.ApplicationMaster;
-import org.apache.hadoop.yarn.ApplicationState;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationMaster;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
@@ -55,27 +63,28 @@ public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
 
   private Configuration conf;
-  private ApplicationID currentAppId;
+  private ApplicationId currentAppId;
   private final ResourceMgrDelegate rm;
   private MRClientProtocol realProxy = null;
   private String serviceAddr = "";
   private String serviceHttpAddr = "";
+  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
   ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm) {
     this.conf = conf;
     this.rm = rm;
   }
 
-  private MRClientProtocol getProxy(JobID jobId) throws AvroRemoteException {
-    return getProxy(TypeConverter.toYarn(jobId).appID, false);
+  private MRClientProtocol getProxy(JobID jobId) throws YarnRemoteException {
+    return getProxy(TypeConverter.toYarn(jobId).getAppId(), false);
   }
 
-  private MRClientProtocol getRefreshedProxy(JobID jobId) throws AvroRemoteException {
-    return getProxy(TypeConverter.toYarn(jobId).appID, true);
+  private MRClientProtocol getRefreshedProxy(JobID jobId) throws YarnRemoteException {
+    return getProxy(TypeConverter.toYarn(jobId).getAppId(), true);
   }
 
-  private MRClientProtocol getProxy(ApplicationID appId, 
-      boolean forceRefresh) throws AvroRemoteException {
+  private MRClientProtocol getProxy(ApplicationId appId, 
+      boolean forceRefresh) throws YarnRemoteException {
     if (!appId.equals(currentAppId) || forceRefresh) {
       currentAppId = appId;
       refreshProxy();
@@ -83,35 +92,35 @@ public class ClientServiceDelegate {
     return realProxy;
   }
 
-  private void refreshProxy() throws AvroRemoteException {
+  private void refreshProxy() throws YarnRemoteException {
     ApplicationMaster appMaster = rm.getApplicationMaster(currentAppId);
-    if (ApplicationState.COMPLETED.equals(appMaster.state)) {
+    if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
       serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
           YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
       LOG.info("Application state is completed. " +
             "Redirecting to job history server " + serviceAddr);
       //TODO:
       serviceHttpAddr = "";
-    } else if (ApplicationState.RUNNING.equals(appMaster.state)){
-      serviceAddr = appMaster.host + ":" + appMaster.rpcPort;
-      serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+    } else if (ApplicationState.RUNNING.equals(appMaster.getState())){
+      serviceAddr = appMaster.getHost() + ":" + appMaster.getRpcPort();
+      serviceHttpAddr = appMaster.getHost() + ":" + appMaster.getHttpPort();
       if (UserGroupInformation.isSecurityEnabled()) {
-        String clientTokenEncoded = appMaster.clientToken.toString();
+        String clientTokenEncoded = appMaster.getClientToken();
         Token<ApplicationTokenIdentifier> clientToken =
             new Token<ApplicationTokenIdentifier>();
         try {
           clientToken.decodeFromUrlString(clientTokenEncoded);
-          clientToken.setService(new Text(appMaster.host.toString() + ":"
-              + appMaster.rpcPort));
+          clientToken.setService(new Text(appMaster.getHost() + ":"
+              + appMaster.getRpcPort()));
           UserGroupInformation.getCurrentUser().addToken(clientToken);
         } catch (IOException e) {
           throw new YarnException(e);
         }
       }
     } else {
-      LOG.warn("Cannot connect to Application with state " + appMaster.state);
+      LOG.warn("Cannot connect to Application with state " + appMaster.getState());
       throw new YarnException(
-          "Cannot connect to Application with state " + appMaster.state);
+          "Cannot connect to Application with state " + appMaster.getState());
     }
     try {
       instantiateProxy(serviceAddr);
@@ -138,16 +147,20 @@ public class ClientServiceDelegate {
 
   public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
       InterruptedException {
-    org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
     try {
-      return TypeConverter.fromYarn(getProxy(arg0).getCounters(jobID));
+      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+      request.setJobId(jobID);
+      return TypeConverter.fromYarn(getProxy(arg0).getCounters(request).getCounters());
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch(Exception e) {
       LOG.debug("Failing to contact application master", e);
       try {
-        return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(jobID));
+        GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+        request.setJobId(jobID);
+        return TypeConverter.fromYarn(getRefreshedProxy(arg0).getCounters(request).getCounters());
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -162,26 +175,31 @@ public class ClientServiceDelegate {
 
   public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
       int arg2) throws IOException, InterruptedException {
-    org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
-    List<org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent> list = null;
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list = null;
+    GetTaskAttemptCompletionEventsRequest request = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
     try {
-      list = getProxy(arg0).getTaskAttemptCompletionEvents(jobID,
-          arg1, arg2);
+      request.setJobId(jobID);
+      request.setFromEventId(arg1);
+      request.setMaxEvents(arg2);
+      list = getProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
       try {
-        list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(jobID,
-            arg1, arg2);
+        request.setJobId(jobID);
+        request.setFromEventId(arg1);
+        request.setMaxEvents(arg2);
+        list = getRefreshedProxy(arg0).getTaskAttemptCompletionEvents(request).getCompletionEventList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
       }
     }
     return TypeConverter.fromYarn(
-        list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0]));
+        list.toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
   }
 
   public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
@@ -189,17 +207,20 @@ public class ClientServiceDelegate {
   throws IOException,
       InterruptedException {
     
-    List<CharSequence> list = null;
-    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0);
+    List<String> list = null;
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(arg0);
+    GetDiagnosticsRequest request = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
     try {
-      list = getProxy(arg0.getJobID()).getDiagnostics(attemptID);
+      request.setTaskAttemptId(attemptID);
+      list = getProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
       try {
-        list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(attemptID);
+        request.setTaskAttemptId(attemptID);
+        list = getRefreshedProxy(arg0.getJobID()).getDiagnostics(request).getDiagnosticsList();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -207,28 +228,31 @@ public class ClientServiceDelegate {
     }
     String[] result = new String[list.size()];
     int i = 0;
-    for (CharSequence c : list) {
+    for (String c : list) {
       result[i++] = c.toString();
     }
     return result;
   }
 
   public JobStatus getJobStatus(JobID oldJobID) throws YarnRemoteException,
-      AvroRemoteException {
-    org.apache.hadoop.mapreduce.v2.api.JobID jobId = 
+      YarnRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = 
       TypeConverter.toYarn(oldJobID);
     LOG.debug("Getting Job status");
     String stagingDir = conf.get("yarn.apps.stagingDir");
     String jobFile = stagingDir + "/" + jobId.toString();
     JobReport report = null;
+    GetJobReportRequest request = recordFactory.newRecordInstance(GetJobReportRequest.class);
     try {
-      report = getProxy(oldJobID).getJobReport(jobId);
+      request.setJobId(jobId);
+      report = getProxy(oldJobID).getJobReport(request).getJobReport();
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch (Exception e) {
       try {
-        report = getRefreshedProxy(oldJobID).getJobReport(jobId);
+        request.setJobId(jobId);
+        report = getRefreshedProxy(oldJobID).getJobReport(request).getJobReport();
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -238,20 +262,23 @@ public class ClientServiceDelegate {
   }
 
   public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
-      throws YarnRemoteException, AvroRemoteException {
-      List<TaskReport> taskReports = null;
-      org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+      throws YarnRemoteException, YarnRemoteException {
+      List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports = null;
+      org.apache.hadoop.mapreduce.v2.api.records.JobId nJobID = TypeConverter.toYarn(jobID);
+      GetTaskReportsRequest request = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
       try {
-        taskReports = getProxy(jobID).getTaskReports(nJobID, 
-            TypeConverter.toYarn(taskType));
+        request.setJobId(nJobID);
+        request.setTaskType(TypeConverter.toYarn(taskType));
+        taskReports = getProxy(jobID).getTaskReports(request).getTaskReportList();
       } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
       } catch(Exception e) {
         LOG.debug("Failed to contact application master ", e);
         try {
-          taskReports = getRefreshedProxy(jobID).getTaskReports(nJobID, 
-              TypeConverter.toYarn(taskType));
+          request.setJobId(nJobID);
+          request.setTaskType(TypeConverter.toYarn(taskType));
+          taskReports = getRefreshedProxy(jobID).getTaskReports(request).getTaskReportList();
         } catch(YarnRemoteException yre) {
           LOG.warn(RPCUtil.toString(yre));
           throw yre;
@@ -262,17 +289,20 @@ public class ClientServiceDelegate {
   }
 
   public Void killJob(JobID jobID) throws YarnRemoteException,
-      AvroRemoteException {
-    org.apache.hadoop.mapreduce.v2.api.JobID  nJobID = TypeConverter.toYarn(jobID);
+      YarnRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId  nJobID = TypeConverter.toYarn(jobID);
+    KillJobRequest request = recordFactory.newRecordInstance(KillJobRequest.class);
     try {
-      getProxy(jobID).killJob(nJobID);
+      request.setJobId(nJobID);
+      getProxy(jobID).killJob(request);
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
       try {
-        getRefreshedProxy(jobID).killJob(nJobID);
+        request.setJobId(nJobID);
+        getRefreshedProxy(jobID).killJob(request);
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;
@@ -282,14 +312,18 @@ public class ClientServiceDelegate {
   }
 
   public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
-      throws YarnRemoteException, AvroRemoteException {
-    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID 
+      throws YarnRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID 
       = TypeConverter.toYarn(taskAttemptID);
+    KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+    FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
     try {
       if (fail) {
-        getProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+        failRequest.setTaskAttemptId(attemptID);
+        getProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
       } else {
-        getProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+        killRequest.setTaskAttemptId(attemptID);
+        getProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
       }
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
@@ -298,9 +332,11 @@ public class ClientServiceDelegate {
       LOG.debug("Failed to contact application master ", e);
       try {
         if (fail) {
-          getRefreshedProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+          failRequest.setTaskAttemptId(attemptID);
+          getRefreshedProxy(taskAttemptID.getJobID()).failTaskAttempt(failRequest);
         } else {
-          getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+          killRequest.setTaskAttemptId(attemptID);
+          getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(killRequest);
         }
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));



Mime
View raw message