hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [8/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cli...
Date Wed, 02 Nov 2011 05:35:03 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Nov  2 05:34:31 2011
@@ -1,25 +1,27 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 
@@ -28,46 +30,73 @@ import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
 import org.junit.Test;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
+
+  public static class MyResolver implements DNSToSwitchMapping {
+    @Override
+    public List<String> resolve(List<String> names) {
+      return Arrays.asList(new String[]{"MyRackName"});
+    }
+  }
+
   @Test
   public void testHistoryParsing() throws Exception {
     Configuration conf = new Configuration();
-    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
+    long amStartTimeEst = System.currentTimeMillis();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    RackResolver.init(conf);
+    MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(),
+        true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
     JobId jobId = job.getID();
     LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
     app.waitForState(job, JobState.SUCCEEDED);
-    
-    //make sure all events are flushed
+
+    // make sure all events are flushed
     app.waitForState(Service.STATE.STOPPED);
-    
-    String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
+
+    String jobhistoryDir = JobHistoryUtils
+        .getHistoryIntermediateDoneDirForUser(conf);
     JobHistory jobHistory = new JobHistory();
     jobHistory.init(conf);
-    
-    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo();
-    String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo);
-    
+
+    JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId)
+        .getJobIndexInfo();
+    String jobhistoryFileName = FileNameIndexUtils
+        .getDoneFileName(jobIndexInfo);
+
     Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
     FSDataInputStream in = null;
     LOG.info("JobHistoryFile is: " + historyFilePath);
@@ -79,38 +108,76 @@ public class TestJobHistoryParsing {
       LOG.info("Can not open history file: " + historyFilePath, ioe);
       throw (new Exception("Can not open History File"));
     }
-    
+
     JobHistoryParser parser = new JobHistoryParser(in);
     JobInfo jobInfo = parser.parse();
-    
-    Assert.assertEquals ("Incorrect username ",
-        "mapred", jobInfo.getUsername());
-    Assert.assertEquals("Incorrect jobName ",
-        "test", jobInfo.getJobname());
-    Assert.assertEquals("Incorrect queuename ",
-        "default", jobInfo.getJobQueueName());
-    Assert.assertEquals("incorrect conf path",
-        "test", jobInfo.getJobConfPath());
-    Assert.assertEquals("incorrect finishedMap ",
-        2, jobInfo.getFinishedMaps());
-    Assert.assertEquals("incorrect finishedReduces ",
-        1, jobInfo.getFinishedReduces());
-    Assert.assertEquals("incorrect uberized ",
-        job.isUber(), jobInfo.getUberized());
-    int totalTasks = jobInfo.getAllTasks().size();
+
+    Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
+        jobInfo.getUsername());
+    Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
+    Assert.assertEquals("Incorrect queuename ", "default",
+        jobInfo.getJobQueueName());
+    Assert
+        .assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
+    Assert.assertEquals("incorrect finishedMap ", 2, jobInfo.getFinishedMaps());
+    Assert.assertEquals("incorrect finishedReduces ", 1,
+        jobInfo.getFinishedReduces());
+    Assert.assertEquals("incorrect uberized ", job.isUber(),
+        jobInfo.getUberized());
+    Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
+    int totalTasks = allTasks.size();
     Assert.assertEquals("total number of tasks is incorrect  ", 3, totalTasks);
 
-    //Assert at taskAttempt level
-    for (TaskInfo taskInfo :  jobInfo.getAllTasks().values()) {
+    // Verify aminfo
+    Assert.assertEquals(1, jobInfo.getAMInfos().size());
+    Assert.assertEquals(MRApp.NM_HOST, jobInfo.getAMInfos().get(0)
+        .getNodeManagerHost());
+    AMInfo amInfo = jobInfo.getAMInfos().get(0);
+    Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+    Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
+    Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+        .getApplicationAttemptId());
+    Assert.assertTrue(amInfo.getStartTime() <= System.currentTimeMillis()
+        && amInfo.getStartTime() >= amStartTimeEst);
+
+    ContainerId fakeCid = BuilderUtils.newContainerId(-1, -1, -1, -1);
+    // Assert at taskAttempt level
+    for (TaskInfo taskInfo : allTasks.values()) {
       int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
-      Assert.assertEquals("total number of task attempts ", 
-          1, taskAttemptCount);
+      Assert
+          .assertEquals("total number of task attempts ", 1, taskAttemptCount);
+      TaskAttemptInfo taInfo = taskInfo.getAllTaskAttempts().values()
+          .iterator().next();
+      Assert.assertNotNull(taInfo.getContainerId());
+      // Verify the wrong ctor is not being used. Remove after mrv1 is removed.
+      Assert.assertFalse(taInfo.getContainerId().equals(fakeCid));
     }
-    
+
+    // Deep compare Job and JobInfo
+    for (Task task : job.getTasks().values()) {
+      TaskInfo taskInfo = allTasks.get(
+          TypeConverter.fromYarn(task.getID()));
+      Assert.assertNotNull("TaskInfo not found", taskInfo);
+      for (TaskAttempt taskAttempt : task.getAttempts().values()) {
+        TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
+            TypeConverter.fromYarn((taskAttempt.getID())));
+        Assert.assertNotNull("TaskAttemptInfo not found", taskAttemptInfo);
+        Assert.assertEquals("Incorrect shuffle port for task attempt",
+            taskAttempt.getShufflePort(), taskAttemptInfo.getShufflePort());
+
+        // Verify rack-name
+        Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
+            .getRackname(), "MyRackName");
+      }
+    }
+
     String summaryFileName = JobHistoryUtils
         .getIntermediateSummaryFileName(jobId);
     Path summaryFile = new Path(jobhistoryDir, summaryFileName);
     String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+    Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
+    Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
     Assert.assertNotNull(jobSummaryString);
 
     Map<String, String> jobSummaryElements = new HashMap<String, String>();
@@ -139,7 +206,7 @@ public class TestJobHistoryParsing {
         Integer.parseInt(jobSummaryElements.get("numMaps")));
     Assert.assertEquals("Mismatch in num reduce slots", 1,
         Integer.parseInt(jobSummaryElements.get("numReduces")));
-    Assert.assertEquals("User does not match", "mapred",
+    Assert.assertEquals("User does not match", System.getProperty("user.name"),
         jobSummaryElements.get("user"));
     Assert.assertEquals("Queue does not match", "default",
         jobSummaryElements.get("queue"));

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHSWebApp.java Wed Nov  2 05:34:31 2011
@@ -22,8 +22,14 @@ import static org.apache.hadoop.mapreduc
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.ATTEMPT_STATE;
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.NM_NODENAME;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.server.nodemanager.webapp.NMWebParams.APP_OWNER;
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -38,9 +44,12 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.AggregatedLogsPage;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.webapp.test.WebAppTests;
 import org.junit.Test;
 
+import static org.mockito.Mockito.verify;
 import com.google.inject.Injector;
 
 public class TestHSWebApp {
@@ -175,9 +184,53 @@ public class TestHSWebApp {
                          new TestAppContext());
   }
   
+  @Test public void testJobCounterView() {
+    LOG.info("JobCounterView");
+    AppContext appContext = new TestAppContext();
+    Map<String, String> params = TestAMWebApp.getJobParams(appContext);
+    WebAppTests.testPage(HsCountersPage.class, AppContext.class,
+                         appContext, params);
+  }
+  
   @Test public void testSingleCounterView() {
     LOG.info("HsSingleCounterPage");
     WebAppTests.testPage(HsSingleCounterPage.class, AppContext.class,
                          new TestAppContext());
   }
+  
+  @Test
+  public void testLogsView1() throws IOException {
+    LOG.info("HsLogsPage");
+    Injector injector =
+        WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class,
+            new TestAppContext());
+    PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
+    verify(spyPw).write("Cannot get container logs without a ContainerId");
+    verify(spyPw).write("Cannot get container logs without a NodeId");
+    verify(spyPw).write("Cannot get container logs without an app owner");
+  }
+
+  @Test
+  public void testLogsView2() throws IOException {
+    LOG.info("HsLogsPage with data");
+    TestAppContext ctx = new TestAppContext();
+    Map<String, String> params = new HashMap<String, String>();
+
+    params.put(CONTAINER_ID, BuilderUtils.newContainerId(1, 1, 333, 1)
+        .toString());
+    params.put(NM_NODENAME, 
+        BuilderUtils.newNodeId(MockJobs.NM_HOST, MockJobs.NM_PORT).toString());
+    params.put(ENTITY_STRING, "container_10_0001_01_000001");
+    params.put(APP_OWNER, "owner");
+
+    Injector injector =
+        WebAppTests.testPage(AggregatedLogsPage.class, AppContext.class, ctx,
+            params);
+    PrintWriter spyPw = WebAppTests.getPrintWriter(injector);
+    verify(spyPw).write(
+        "Aggregation is not enabled. Try the nodemanager at "
+            + MockJobs.NM_HOST + ":" + MockJobs.NM_PORT);
+  }
 }
+  
+ 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Wed Nov  2 05:34:31 2011
@@ -16,17 +16,18 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>${hadoop-mapreduce.version}</version>
+    <version>0.24.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-jobclient</name>
 
   <properties>
-    <install.file>${project.artifact.file}</install.file>
     <fork.mode>always</fork.mode>
-    <mr.basedir>${project.parent.parent.basedir}</mr.basedir>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <mr.basedir>${project.parent.basedir}/../</mr.basedir>
   </properties>
 
   <dependencies>

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Wed Nov  2 05:34:31 2011
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.mapreduce.JobSt
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -47,30 +49,32 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -84,7 +88,6 @@ public class ClientServiceDelegate {
   private final ApplicationId appId;
   private final ResourceMgrDelegate rm;
   private final MRClientProtocol historyServerProxy;
-  private boolean forceRefresh;
   private MRClientProtocol realProxy = null;
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private static String UNKNOWN_USER = "Unknown User";
@@ -125,10 +128,10 @@ public class ClientServiceDelegate {
   }
 
   private MRClientProtocol getProxy() throws YarnRemoteException {
-    if (!forceRefresh && realProxy != null) {
+    if (realProxy != null) {
       return realProxy;
     }
-      //TODO RM NPEs for unknown jobs. History may still be aware.
+    
     // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
     // and redirect to the history server.
     ApplicationReport application = rm.getApplicationReport(appId);
@@ -136,7 +139,9 @@ public class ClientServiceDelegate {
       trackingUrl = application.getTrackingUrl();
     }
     String serviceAddr = null;
-    while (application == null || YarnApplicationState.RUNNING.equals(application.getYarnApplicationState())) {
+    while (application == null
+        || YarnApplicationState.RUNNING == application
+            .getYarnApplicationState()) {
       if (application == null) {
         LOG.info("Could not get Job info from RM for job " + jobId
             + ". Redirecting to job history server.");
@@ -166,7 +171,7 @@ public class ClientServiceDelegate {
         }
         LOG.info("Tracking Url of JOB is " + application.getTrackingUrl());
         LOG.info("Connecting to " + serviceAddr);
-        instantiateAMProxy(serviceAddr);
+        realProxy = instantiateAMProxy(serviceAddr);
         return realProxy;
       } catch (IOException e) {
         //possibly the AM has crashed
@@ -196,7 +201,6 @@ public class ClientServiceDelegate {
      * block on it. This is to be able to return job status
      * on an allocating Application.
      */
-
     String user = application.getUser();
     if (user == null) {
       throw RPCUtil.getRemoteException("User is not set in the application report");
@@ -237,10 +241,12 @@ public class ClientServiceDelegate {
     return historyServerProxy;
   }
 
-  private void instantiateAMProxy(final String serviceAddr) throws IOException {
+  MRClientProtocol instantiateAMProxy(final String serviceAddr)
+      throws IOException {
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
-    realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+    MRClientProtocol proxy = currentUser
+        .doAs(new PrivilegedAction<MRClientProtocol>() {
       @Override
       public MRClientProtocol run() {
         YarnRPC rpc = YarnRPC.create(conf);
@@ -249,6 +255,7 @@ public class ClientServiceDelegate {
       }
     });
     LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+    return proxy;
   }
 
   private synchronized Object invoke(String method, Class argClass,
@@ -269,18 +276,23 @@ public class ClientServiceDelegate {
         throw yre;
       } catch (InvocationTargetException e) {
         if (e.getTargetException() instanceof YarnRemoteException) {
-          LOG.warn("Exception thrown by remote end.", e
-              .getTargetException());
+          LOG.warn("Error from remote end: " + e
+              .getTargetException().getLocalizedMessage());
+          LOG.debug("Tracing remote error ", e.getTargetException());
           throw (YarnRemoteException) e.getTargetException();
         }
-        LOG.info("Failed to contact AM/History for job " + jobId
-            + "  Will retry..", e.getTargetException());
-        forceRefresh = true;
+        LOG.info("Failed to contact AM/History for job " + jobId + 
+            " retrying..");
+        LOG.debug("Failed exception on AM/History contact", 
+            e.getTargetException());
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
       } catch (Exception e) {
         LOG.info("Failed to contact AM/History for job " + jobId
-            + "  Will retry..", e);
+            + "  Will retry..");
         LOG.debug("Failing to contact application master", e);
-        forceRefresh = true;
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
       }
     }
   }
@@ -393,5 +405,52 @@ public class ClientServiceDelegate {
     return true;
   }
 
+  public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+      throws YarnRemoteException, IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+        TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
 
-}
+    JobReport report =
+        ((GetJobReportResponse) invoke("getJobReport",
+            GetJobReportRequest.class, request)).getJobReport();
+    if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+        JobState.ERROR).contains(report.getJobState())) {
+      if (oldTaskAttemptID != null) {
+        GetTaskAttemptReportRequest taRequest =
+            recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+        taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+        TaskAttemptReport taReport =
+            ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+                GetTaskAttemptReportRequest.class, taRequest))
+                .getTaskAttemptReport();
+        if (taReport.getContainerId() == null
+            || taReport.getNodeManagerHost() == null) {
+          throw new IOException("Unable to get log information for task: "
+              + oldTaskAttemptID);
+        }
+        return new LogParams(
+            taReport.getContainerId().toString(),
+            taReport.getContainerId().getApplicationAttemptId()
+                .getApplicationId().toString(),
+            BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+                taReport.getNodeManagerPort()).toString(), report.getUser());
+      } else {
+        if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+          throw new IOException("Unable to get log information for job: "
+              + oldJobID);
+        }
+        AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+        return new LogParams(
+            amInfo.getContainerId().toString(),
+            amInfo.getAppAttemptId().getApplicationId().toString(),
+            BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+                amInfo.getNodeManagerPort()).toString(), report.getUser());
+      }
+    } else {
+      throw new IOException("Cannot get log path for a in-progress job");
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Wed Nov  2 05:34:31 2011
@@ -77,7 +77,8 @@ public class NotRunningJob implements MR
     // Setting AppState to NEW and finalStatus to UNDEFINED as they are never used 
     // for a non running job
     return BuilderUtils.newApplicationReport(unknownAppId, "N/A", "N/A", "N/A", "N/A", 0, "", 
-        YarnApplicationState.NEW, "N/A", "N/A", 0, 0, FinalApplicationStatus.UNDEFINED);    
+        YarnApplicationState.NEW, "N/A", "N/A", 0, 0, 
+        FinalApplicationStatus.UNDEFINED, null, "N/A");    
   }
 
   NotRunningJob(ApplicationReport applicationReport, JobState jobState) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Wed Nov  2 05:34:31 2011
@@ -89,7 +89,9 @@ public class ResourceMgrDelegate {
     InetSocketAddress rmAddress =
         NetUtils.createSocketAddr(this.conf.get(
             YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS));
+            YarnConfiguration.DEFAULT_RM_ADDRESS),
+            YarnConfiguration.DEFAULT_RM_PORT,
+            YarnConfiguration.RM_ADDRESS);
     LOG.info("Connecting to ResourceManager at " + rmAddress);
     applicationsManager =
         (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Nov  2 05:34:31 2011
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
@@ -51,6 +53,7 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.Credentials;
@@ -60,6 +63,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -73,6 +77,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 
@@ -320,14 +325,14 @@ public class YARNRunner implements Clien
     }
 
     // Setup the command to run the AM
-    Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+    List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
+    // TODO: why do we use 'conf' some places and 'jobConf' others?
     long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
-    vargs.add("-Dlog4j.configuration=container-log4j.properties");
-    vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "="
-        + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
-    vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
+    String logLevel = jobConf.get(
+        MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+    MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
 
     vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
         MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
@@ -358,14 +363,19 @@ public class YARNRunner implements Clien
     // Parse distributed cache
     MRApps.setupDistributedCache(jobConf, localResources);
 
+    Map<ApplicationAccessType, String> acls
+        = new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
+        MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+    acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
+        MRJobConfig.JOB_ACL_MODIFY_JOB,
+        MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+
     // Setup ContainerLaunchContext for AM container
-    ContainerLaunchContext amContainer =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    amContainer.setResource(capability);             // Resource (mem) required
-    amContainer.setLocalResources(localResources);   // Local resources
-    amContainer.setEnvironment(environment);         // Environment
-    amContainer.setCommands(vargsFinal);             // Command for AM
-    amContainer.setContainerTokens(securityTokens);  // Security tokens
+    ContainerLaunchContext amContainer = BuilderUtils
+        .newContainerLaunchContext(null, UserGroupInformation
+            .getCurrentUser().getShortUserName(), capability, localResources,
+            environment, vargsFinal, null, securityTokens, acls);
 
     // Set up the ApplicationSubmissionContext
     ApplicationSubmissionContext appContext =
@@ -495,4 +505,10 @@ public class YARNRunner implements Clien
     return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
         clientMethodsHash);
   }
+
+  @Override
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException {
+    return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/FailingMapper.java Wed Nov  2 05:34:31 2011
@@ -30,6 +30,22 @@ import org.apache.hadoop.mapreduce.Mappe
 public class FailingMapper extends Mapper<Text, Text, Text, Text> {
   public void map(Text key, Text value,
       Context context) throws IOException,InterruptedException {
+
+    // Just create a non-daemon thread which hangs forever. MR AM should not be
+    // hung by this.
+    new Thread() {
+      @Override
+      public void run() {
+        synchronized (this) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+            //
+          }
+        }
+      }
+    }.start();
+
     if (context.getTaskAttemptID().getId() == 0) {
       System.out.println("Attempt:" + context.getTaskAttemptID() + 
         " Failing mapper throwing exception");

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Wed Nov  2 05:34:31 2011
@@ -25,7 +25,7 @@ import java.util.Iterator;
 
 import junit.framework.Assert;
 
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,8 +68,6 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -84,6 +82,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -123,20 +123,24 @@ public class TestClientRedirect {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
     conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
+
+    // Start the RM.
     RMService rmService = new RMService("test");
     rmService.init(conf);
     rmService.start();
 
+    // Start the AM.
     AMService amService = new AMService();
     amService.init(conf);
     amService.start(conf);
-    amRunning = true;
 
+    // Start the HS.
     HistoryService historyService = new HistoryService();
     historyService.init(conf);
     historyService.start(conf);
 
     LOG.info("services started");
+
     Cluster cluster = new Cluster(conf);
     org.apache.hadoop.mapreduce.JobID jobID =
       new org.apache.hadoop.mapred.JobID("201103121733", 1);
@@ -151,13 +155,13 @@ public class TestClientRedirect {
 
     //bring down the AM service
     amService.stop();
-    amRunning = false;
 
     LOG.info("Sleeping for 5 seconds after stop for" +
     		" the server to exit cleanly..");
     Thread.sleep(5000);
 
     amRestarting = true;
+
     // Same client
     //results are returned from fake (not started job)
     counters = cluster.getJob(jobID).getCounters();
@@ -181,14 +185,15 @@ public class TestClientRedirect {
     amService = new AMService();
     amService.init(conf);
     amService.start(conf);
-    amRunning = true;
     amContact = false; //reset
 
     counters = cluster.getJob(jobID).getCounters();
     validateCounters(counters);
     Assert.assertTrue(amContact);
 
-    amRunning = false;
+    // Stop the AM. It is not even restarting. So it should be treated as
+    // completed.
+    amService.stop();
 
     // Same client
     counters = cluster.getJob(jobID).getCounters();
@@ -347,6 +352,7 @@ public class TestClientRedirect {
     private InetSocketAddress bindAddress;
     private Server server;
     private final String hostAddress;
+
     public AMService() {
       this(AMHOSTADDRESS);
     }
@@ -376,11 +382,13 @@ public class TestClientRedirect {
         NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
             + ":" + server.getPort());
        super.start();
+       amRunning = true;
     }
 
     public void stop() {
-      server.close();
+      server.stop();
       super.stop();
+      amRunning = false;
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Wed Nov  2 05:34:31 2011
@@ -1,208 +1,272 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
-
-/**
- * Tests for ClientServiceDelegate.java
- */
-
-public class TestClientServiceDelegate {
-  private JobID oldJobId = JobID.forName("job_1315895242400_2");
-  private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
-      .toYarn(oldJobId);
-
-  @Test
-  public void testUnknownAppInRM() throws Exception {
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
-        getJobReportResponse());
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, getRMDelegate());
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertNotNull(jobStatus);
-  }
-
-  @Test
-  public void testRemoteExceptionFromHistoryServer() throws Exception {
-
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        RPCUtil.getRemoteException("Job ID doesnot Exist"));
-
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
-        .thenReturn(null);
-
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, rm);
-
-    try {
-      clientServiceDelegate.getJobStatus(oldJobId);
-      Assert.fail("Invoke should throw exception after retries.");
-    } catch (YarnRemoteException e) {
-      Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testRetriesOnConnectionFailure() throws Exception {
-
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
-        .thenThrow(new RuntimeException("3"))
-        .thenReturn(getJobReportResponse());
-
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
-        .thenReturn(null);
-
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, rm);
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertNotNull(jobStatus);
-  }
-
-  @Test
-  public void testHistoryServerNotConfigured() throws Exception {
-    //RM doesn't have app report and job History Server is not configured
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        null, getRMDelegate());
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertEquals("N/A", jobStatus.getUsername());
-    Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
-
-    //RM has app report and job History Server is not configured
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    ApplicationReport applicationReport = getApplicationReport();
-    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
-        applicationReport);
-
-    clientServiceDelegate = getClientServiceDelegate(null, rm);
-    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
-  }
-
-  
-  @Test
-  public void testJobReportFromHistoryServer() throws Exception {                                 
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);                           
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(                      
-        getJobReportResponseFromHistoryServer());                                                 
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);                                     
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))                      
-    .thenReturn(null);                                                                        
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(                       
-        historyServerProxy, rm);
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);                           
-    Assert.assertNotNull(jobStatus);
-    Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                               
-    Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());                    
-    Assert.assertEquals(1.0f, jobStatus.getMapProgress());                                        
-    Assert.assertEquals(1.0f, jobStatus.getReduceProgress());                                     
-  }
-
-  private GetJobReportRequest getJobReportRequest() {
-    GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
-    request.setJobId(jobId);
-    return request;
-  }
-
-  private GetJobReportResponse getJobReportResponse() {
-    GetJobReportResponse jobReportResponse = Records
-        .newRecord(GetJobReportResponse.class);
-    JobReport jobReport = Records.newRecord(JobReport.class);
-    jobReport.setJobId(jobId);
-    jobReport.setJobState(JobState.SUCCEEDED);
-    jobReportResponse.setJobReport(jobReport);
-    return jobReportResponse;
-  }
-
-  private ApplicationReport getApplicationReport() {
-    ApplicationReport applicationReport = Records
-        .newRecord(ApplicationReport.class);
-    applicationReport.setYarnApplicationState(YarnApplicationState.FINISHED);
-    applicationReport.setUser("root");
-    applicationReport.setHost("N/A");
-    applicationReport.setName("N/A");
-    applicationReport.setQueue("N/A");
-    applicationReport.setStartTime(0);
-    applicationReport.setFinishTime(0);
-    applicationReport.setTrackingUrl("N/A");
-    applicationReport.setDiagnostics("N/A");
-    applicationReport.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-    return applicationReport;
-  }
-
-  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
-    return rm;
-  }
-
-  private ClientServiceDelegate getClientServiceDelegate(
-      MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
-    Configuration conf = new YarnConfiguration();
-    conf.set(MRConfig.FRAMEWORK_NAME, "yarn");
-    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
-        conf, rm, oldJobId, historyServerProxy);
-    return clientServiceDelegate;
-  }
-
-  private GetJobReportResponse getJobReportResponseFromHistoryServer() {
-    GetJobReportResponse jobReportResponse = Records                                              
-        .newRecord(GetJobReportResponse.class);                                                   
-    JobReport jobReport = Records.newRecord(JobReport.class);                                     
-    jobReport.setJobId(jobId);                                                                    
-    jobReport.setJobState(JobState.SUCCEEDED);                                                    
-    jobReport.setMapProgress(1.0f);
-    jobReport.setReduceProgress(1.0f);
-    jobReport.setJobFile("TestJobFilePath");
-    jobReport.setTrackingUrl("TestTrackingUrl");
-    jobReportResponse.setJobReport(jobReport);
-    return jobReportResponse;
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Test;
+
+/**
+ * Tests for ClientServiceDelegate.java
+ */
+
+public class TestClientServiceDelegate {
+  private JobID oldJobId = JobID.forName("job_1315895242400_2");
+  private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
+      .toYarn(oldJobId);
+
+  @Test
+  public void testUnknownAppInRM() throws Exception {
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
+        getJobReportResponse());
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, getRMDelegate());
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+  }
+
+  @Test
+  public void testRemoteExceptionFromHistoryServer() throws Exception {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+        RPCUtil.getRemoteException("Job ID doesnot Exist"));
+
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+        .thenReturn(null);
+
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, rm);
+
+    try {
+      clientServiceDelegate.getJobStatus(oldJobId);
+      Assert.fail("Invoke should throw exception after retries.");
+    } catch (YarnRemoteException e) {
+      Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRetriesOnConnectionFailure() throws Exception {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
+        .thenThrow(new RuntimeException("3"))
+        .thenReturn(getJobReportResponse());
+
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+        .thenReturn(null);
+
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, rm);
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    verify(historyServerProxy, times(4)).getJobReport(
+        any(GetJobReportRequest.class));
+  }
+
+  @Test
+  public void testHistoryServerNotConfigured() throws Exception {
+    //RM doesn't have app report and job History Server is not configured
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        null, getRMDelegate());
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertEquals("N/A", jobStatus.getUsername());
+    Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
+
+    //RM has app report and job History Server is not configured
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    ApplicationReport applicationReport = getFinishedApplicationReport();
+    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
+        applicationReport);
+
+    clientServiceDelegate = getClientServiceDelegate(null, rm);
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
+  }
+  
+  @Test
+  public void testJobReportFromHistoryServer() throws Exception {                                 
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);                           
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(                      
+        getJobReportResponseFromHistoryServer());                                                 
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);                                     
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))                      
+    .thenReturn(null);                                                                        
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(                       
+        historyServerProxy, rm);
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);                           
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                               
+    Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());                    
+    Assert.assertEquals(1.0f, jobStatus.getMapProgress());                                        
+    Assert.assertEquals(1.0f, jobStatus.getReduceProgress());                                     
+  }
+
+  @Test
+  public void testReconnectOnAMRestart() throws IOException {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+
+    // RM returns AM1 url, null, null and AM2 url on invocations.
+    // Nulls simulate the time when AM2 is in the process of restarting.
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
+        getRunningApplicationReport("am1", 78)).thenReturn(
+        getRunningApplicationReport(null, 0)).thenReturn(
+        getRunningApplicationReport(null, 0)).thenReturn(
+        getRunningApplicationReport("am2", 90));
+
+    GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
+    when(jobReportResponse1.getJobReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+
+    // First AM returns a report with jobName firstGen and simulates AM shutdown
+    // on second invocation.
+    MRClientProtocol firstGenAMProxy = mock(MRClientProtocol.class);
+    when(firstGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(jobReportResponse1).thenThrow(
+            new RuntimeException("AM is down!"));
+
+    GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
+    when(jobReportResponse2.getJobReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "anything", null));
+
+    // Second AM generation returns a report with jobName secondGen
+    MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
+    when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(jobReportResponse2);
+
+    ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
+        historyServerProxy, rmDelegate));
+    // First time, connection should be to AM1, then to AM2. Further requests
+    // should use the same proxy to AM2 and so instantiateProxy shouldn't be
+    // called.
+    doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
+        clientServiceDelegate).instantiateAMProxy(any(String.class));
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-firstGen", jobStatus.getJobName());
+
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+    verify(clientServiceDelegate, times(2)).instantiateAMProxy(
+        any(String.class));
+  }
+
+  private GetJobReportRequest getJobReportRequest() {
+    GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
+    request.setJobId(jobId);
+    return request;
+  }
+
+  private GetJobReportResponse getJobReportResponse() {
+    GetJobReportResponse jobReportResponse = Records
+        .newRecord(GetJobReportResponse.class);
+    JobReport jobReport = Records.newRecord(JobReport.class);
+    jobReport.setJobId(jobId);
+    jobReport.setJobState(JobState.SUCCEEDED);
+    jobReportResponse.setJobReport(jobReport);
+    return jobReportResponse;
+  }
+
+  private ApplicationReport getFinishedApplicationReport() {
+    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+        1234, 5), "user", "queue", "appname", "host", 124, null,
+        YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
+        FinalApplicationStatus.SUCCEEDED, null, "N/A");
+  }
+
+  private ApplicationReport getRunningApplicationReport(String host, int port) {
+    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+        1234, 5), "user", "queue", "appname", host, port, null,
+        YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
+        FinalApplicationStatus.UNDEFINED, null, "N/A");
+  }
+
+  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
+    return rm;
+  }
+
+  private ClientServiceDelegate getClientServiceDelegate(
+      MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
+    Configuration conf = new YarnConfiguration();
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rm, oldJobId, historyServerProxy);
+    return clientServiceDelegate;
+  }
+
+  private GetJobReportResponse getJobReportResponseFromHistoryServer() {
+    GetJobReportResponse jobReportResponse = Records                                              
+        .newRecord(GetJobReportResponse.class);                                                   
+    JobReport jobReport = Records.newRecord(JobReport.class);                                     
+    jobReport.setJobId(jobId);                                                                    
+    jobReport.setJobState(JobState.SUCCEEDED);                                                    
+    jobReport.setMapProgress(1.0f);
+    jobReport.setReduceProgress(1.0f);
+    jobReport.setJobFile("TestJobFilePath");
+    jobReport.setTrackingUrl("TestTrackingUrl");
+    jobReportResponse.setJobReport(jobReport);
+    return jobReportResponse;
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Wed Nov  2 05:34:31 2011
@@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends M
   private JobHistoryServerWrapper historyServerWrapper;
 
   public MiniMRYarnCluster(String testName) {
-    super(testName);
+    this(testName, 1);
+  }
+  
+  public MiniMRYarnCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs);
     //TODO: add the history server
     historyServerWrapper = new JobHistoryServerWrapper();
     addService(historyServerWrapper);
@@ -80,7 +84,7 @@ public class MiniMRYarnCluster extends M
         Service.class);
 
     // Non-standard shuffle port
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
 
     conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
         DefaultContainerExecutor.class, ContainerExecutor.class);
@@ -115,6 +119,7 @@ public class MiniMRYarnCluster extends M
           LOG.info("Waiting for HistoryServer to start...");
           Thread.sleep(1500);
         }
+        //TODO Add a timeout. State.STOPPED check ?
         if (historyServer.getServiceState() != STATE.STARTED) {
           throw new IOException("HistoryServer failed to start");
         }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Wed Nov  2 05:34:31 2011
@@ -27,8 +27,6 @@ import java.security.PrivilegedException
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.FailingMapper;
@@ -70,6 +68,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -103,7 +102,7 @@ public class TestMRJobs {
     }
 
     if (mrCluster == null) {
-      mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
+      mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
       Configuration conf = new Configuration();
       mrCluster.init(conf);
       mrCluster.start();
@@ -135,7 +134,7 @@ public class TestMRJobs {
     }
 
     Configuration sleepConf = new Configuration(mrCluster.getConfig());
-    // set master address to local to test that local mode applied iff framework == classic and master_address == local
+    // set master address to local to test that local mode applied iff framework == local
     sleepConf.set(MRConfig.MASTER_ADDRESS, "local");	
     
     SleepJob sleepJob = new SleepJob();
@@ -300,7 +299,6 @@ public class TestMRJobs {
   throws IOException, InterruptedException, ClassNotFoundException {
     Configuration myConf = new Configuration(mrCluster.getConfig());
     myConf.setInt(MRJobConfig.NUM_MAPS, 1);
-    myConf.setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
     myConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
 
     Job job = new Job(myConf);
@@ -324,7 +322,7 @@ public class TestMRJobs {
     return job;
   }
 
-//@Test
+  //@Test
   public void testSleepJobWithSecurityOn() throws IOException,
       InterruptedException, ClassNotFoundException {
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Wed Nov  2 05:34:31 2011
@@ -20,13 +20,13 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import junit.framework.Assert;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.FailingMapper;
 import org.apache.hadoop.SleepJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +35,20 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Before;
 import org.junit.After;
 import org.junit.Test;
@@ -105,6 +117,8 @@ public class TestMRJobsWithHistoryServic
       return;
     }
 
+
+    
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(mrCluster.getConfig());
     // Job with 3 maps and 2 reduces
@@ -113,7 +127,8 @@ public class TestMRJobsWithHistoryServic
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.waitForCompletion(true);
     Counters counterMR = job.getCounters();
-    ApplicationId appID = TypeConverter.toYarn(job.getJobID()).getAppId();
+    JobId jobId = TypeConverter.toYarn(job.getJobID());
+    ApplicationId appID = jobId.getAppId();
     while (true) {
       Thread.sleep(1000);
       if (mrCluster.getResourceManager().getRMContext().getRMApps()
@@ -126,6 +141,36 @@ public class TestMRJobsWithHistoryServic
     LOG.info("CounterHS " + counterHS);
     LOG.info("CounterMR " + counterMR);
     Assert.assertEquals(counterHS, counterMR);
+    
+    MRClientProtocol historyClient = instantiateHistoryProxy();
+    GetJobReportRequest gjReq = Records.newRecord(GetJobReportRequest.class);
+    gjReq.setJobId(jobId);
+    JobReport jobReport = historyClient.getJobReport(gjReq).getJobReport();
+    verifyJobReport(jobReport, jobId);
   }
 
+  private void verifyJobReport(JobReport jobReport, JobId jobId) {
+    List<AMInfo> amInfos = jobReport.getAMInfos();
+    Assert.assertEquals(1, amInfos.size());
+    AMInfo amInfo = amInfos.get(0);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(jobId.getAppId(), 1);
+    ContainerId amContainerId = BuilderUtils.newContainerId(appAttemptId, 1);  
+    Assert.assertEquals(appAttemptId, amInfo.getAppAttemptId());
+    Assert.assertEquals(amContainerId, amInfo.getContainerId());
+    Assert.assertTrue(jobReport.getSubmitTime() > 0);
+    Assert.assertTrue(jobReport.getStartTime() > 0
+        && jobReport.getStartTime() >= jobReport.getSubmitTime());
+    Assert.assertTrue(jobReport.getFinishTime() > 0
+        && jobReport.getFinishTime() >= jobReport.getStartTime());
+  }
+  
+  private MRClientProtocol instantiateHistoryProxy() {
+    final String serviceAddr =
+        mrCluster.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol historyClient =
+        (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), mrCluster.getConfig());
+    return historyClient;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Wed Nov  2 05:34:31 2011
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2;
 import java.io.File;
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Counters;
@@ -34,11 +32,10 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore
 public class TestUberAM extends TestMRJobs {
 
   private static final Log LOG = LogFactory.getLog(TestUberAM.class);
@@ -138,8 +135,8 @@ public class TestUberAM extends TestMRJo
 
     TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
     Assert.assertEquals(1, events.length);
-    Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
-        events[0].getStatus().FAILED);
+    Assert.assertEquals(TaskCompletionEvent.Status.TIPFAILED,
+        events[0].getStatus());
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
     
     //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml Wed Nov  2 05:34:31 2011
@@ -16,16 +16,17 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>${hadoop-mapreduce.version}</version>
+    <version>0.24.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-shuffle</name>
 
   <properties>
-    <install.file>${project.artifact.file}</install.file>
-    <mr.basedir>${project.parent.parent.basedir}</mr.basedir>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <mr.basedir>${project.parent.basedir}/../</mr.basedir>
   </properties>
 
   <dependencies>

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Nov  2 05:34:31 2011
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import javax.crypto.SecretKey;
@@ -103,6 +104,8 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 public class ShuffleHandler extends AbstractService 
     implements AuxServices.AuxiliaryService {
 
@@ -223,12 +226,21 @@ public class ShuffleHandler extends Abst
   public void stopApp(ApplicationId appId) {
     JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
     secretManager.removeTokenForJob(jobId.toString());
+    userRsrc.remove(jobId.toString());
   }
 
   @Override
   public synchronized void init(Configuration conf) {
+    ThreadFactory bossFactory = new ThreadFactoryBuilder()
+      .setNameFormat("ShuffleHandler Netty Boss #%d")
+      .build();
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+      .setNameFormat("ShuffleHandler Netty Worker #%d")
+      .build();
+    
     selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory));
     super.init(new Configuration(conf));
   }
 
@@ -237,9 +249,14 @@ public class ShuffleHandler extends Abst
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
+    HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+    bootstrap.setPipelineFactory(pipelineFact);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    accepted.add(bootstrap.bind(new InetSocketAddress(port)));
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+    pipelineFact.SHUFFLE.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.start();
   }
@@ -292,13 +309,17 @@ public class ShuffleHandler extends Abst
     private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
-    private final int port;
+    private int port;
 
     public Shuffle(Configuration conf) {
       this.conf = conf;
       indexCache = new IndexCache(new JobConf(conf));
       this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
 
     private List<String> splitMaps(List<String> mapq) {
       if (null == mapq) {



Mime
View raw message