hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1189023 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/ap...
Date Wed, 26 Oct 2011 04:31:28 GMT
Author: mahadev
Date: Wed Oct 26 04:31:28 2011
New Revision: 1189023

URL: http://svn.apache.org/viewvc?rev=1189023&view=rev
Log:
MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM and prints a lots
of logs. (vinodkv via mahadev)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Oct 26 04:31:28 2011
@@ -1776,6 +1776,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2977. Fix ResourceManager to renew HDFS delegation tokens for
     applications. (acmurthy) 
 
+    MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM 
+    and prints a lots of logs. (vinodkv via mahadev)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
Wed Oct 26 04:31:28 2011
@@ -81,7 +81,6 @@ public class ClientServiceDelegate {
   private final ApplicationId appId;
   private final ResourceMgrDelegate rm;
   private final MRClientProtocol historyServerProxy;
-  private boolean forceRefresh;
   private MRClientProtocol realProxy = null;
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private static String UNKNOWN_USER = "Unknown User";
@@ -122,7 +121,7 @@ public class ClientServiceDelegate {
   }
 
   private MRClientProtocol getProxy() throws YarnRemoteException {
-    if (!forceRefresh && realProxy != null) {
+    if (realProxy != null) {
       return realProxy;
     }
     
@@ -133,7 +132,9 @@ public class ClientServiceDelegate {
       trackingUrl = application.getTrackingUrl();
     }
     String serviceAddr = null;
-    while (application == null || YarnApplicationState.RUNNING == application.getYarnApplicationState())
{
+    while (application == null
+        || YarnApplicationState.RUNNING == application
+            .getYarnApplicationState()) {
       if (application == null) {
         LOG.info("Could not get Job info from RM for job " + jobId
             + ". Redirecting to job history server.");
@@ -163,7 +164,7 @@ public class ClientServiceDelegate {
         }
         LOG.info("Tracking Url of JOB is " + application.getTrackingUrl());
         LOG.info("Connecting to " + serviceAddr);
-        instantiateAMProxy(serviceAddr);
+        realProxy = instantiateAMProxy(serviceAddr);
         return realProxy;
       } catch (IOException e) {
         //possibly the AM has crashed
@@ -233,10 +234,12 @@ public class ClientServiceDelegate {
     return historyServerProxy;
   }
 
-  private void instantiateAMProxy(final String serviceAddr) throws IOException {
+  MRClientProtocol instantiateAMProxy(final String serviceAddr)
+      throws IOException {
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
-    realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+    MRClientProtocol proxy = currentUser
+        .doAs(new PrivilegedAction<MRClientProtocol>() {
       @Override
       public MRClientProtocol run() {
         YarnRPC rpc = YarnRPC.create(conf);
@@ -245,6 +248,7 @@ public class ClientServiceDelegate {
       }
     });
     LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+    return proxy;
   }
 
   private synchronized Object invoke(String method, Class argClass,
@@ -274,12 +278,14 @@ public class ClientServiceDelegate {
             " retrying..");
         LOG.debug("Failed exception on AM/History contact", 
             e.getTargetException());
-        forceRefresh = true;
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
       } catch (Exception e) {
         LOG.info("Failed to contact AM/History for job " + jobId
             + "  Will retry..");
         LOG.debug("Failing to contact application master", e);
-        forceRefresh = true;
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
       }
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
Wed Oct 26 04:31:28 2011
@@ -68,8 +68,6 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -84,6 +82,8 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -123,20 +123,24 @@ public class TestClientRedirect {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
     conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS);
     conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS);
+
+    // Start the RM.
     RMService rmService = new RMService("test");
     rmService.init(conf);
     rmService.start();
 
+    // Start the AM.
     AMService amService = new AMService();
     amService.init(conf);
     amService.start(conf);
-    amRunning = true;
 
+    // Start the HS.
     HistoryService historyService = new HistoryService();
     historyService.init(conf);
     historyService.start(conf);
 
     LOG.info("services started");
+
     Cluster cluster = new Cluster(conf);
     org.apache.hadoop.mapreduce.JobID jobID =
       new org.apache.hadoop.mapred.JobID("201103121733", 1);
@@ -151,13 +155,13 @@ public class TestClientRedirect {
 
     //bring down the AM service
     amService.stop();
-    amRunning = false;
 
     LOG.info("Sleeping for 5 seconds after stop for" +
     		" the server to exit cleanly..");
     Thread.sleep(5000);
 
     amRestarting = true;
+
     // Same client
     //results are returned from fake (not started job)
     counters = cluster.getJob(jobID).getCounters();
@@ -181,14 +185,15 @@ public class TestClientRedirect {
     amService = new AMService();
     amService.init(conf);
     amService.start(conf);
-    amRunning = true;
     amContact = false; //reset
 
     counters = cluster.getJob(jobID).getCounters();
     validateCounters(counters);
     Assert.assertTrue(amContact);
 
-    amRunning = false;
+    // Stop the AM. It is not even restarting. So it should be treated as
+    // completed.
+    amService.stop();
 
     // Same client
     counters = cluster.getJob(jobID).getCounters();
@@ -347,6 +352,7 @@ public class TestClientRedirect {
     private InetSocketAddress bindAddress;
     private Server server;
     private final String hostAddress;
+
     public AMService() {
       this(AMHOSTADDRESS);
     }
@@ -376,11 +382,13 @@ public class TestClientRedirect {
         NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
             + ":" + server.getPort());
        super.start();
+       amRunning = true;
     }
 
     public void stop() {
       server.stop();
       super.stop();
+      amRunning = false;
     }
 
     @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1189023&r1=1189022&r2=1189023&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java
Wed Oct 26 04:31:28 2011
@@ -1,208 +1,272 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Test;
-
-/**
- * Tests for ClientServiceDelegate.java
- */
-
-public class TestClientServiceDelegate {
-  private JobID oldJobId = JobID.forName("job_1315895242400_2");
-  private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
-      .toYarn(oldJobId);
-
-  @Test
-  public void testUnknownAppInRM() throws Exception {
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
-        getJobReportResponse());
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, getRMDelegate());
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertNotNull(jobStatus);
-  }
-
-  @Test
-  public void testRemoteExceptionFromHistoryServer() throws Exception {
-
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        RPCUtil.getRemoteException("Job ID doesnot Exist"));
-
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
-        .thenReturn(null);
-
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, rm);
-
-    try {
-      clientServiceDelegate.getJobStatus(oldJobId);
-      Assert.fail("Invoke should throw exception after retries.");
-    } catch (YarnRemoteException e) {
-      Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
-    }
-  }
-
-  @Test
-  public void testRetriesOnConnectionFailure() throws Exception {
-
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
-        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
-        .thenThrow(new RuntimeException("3"))
-        .thenReturn(getJobReportResponse());
-
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
-        .thenReturn(null);
-
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        historyServerProxy, rm);
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertNotNull(jobStatus);
-  }
-
-  @Test
-  public void testHistoryServerNotConfigured() throws Exception {
-    //RM doesn't have app report and job History Server is not configured
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
-        null, getRMDelegate());
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertEquals("N/A", jobStatus.getUsername());
-    Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
-
-    //RM has app report and job History Server is not configured
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    ApplicationReport applicationReport = getApplicationReport();
-    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
-        applicationReport);
-
-    clientServiceDelegate = getClientServiceDelegate(null, rm);
-    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
-    Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
-  }
-
-  
-  @Test
-  public void testJobReportFromHistoryServer() throws Exception {                       
         
-    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);                 
         
-    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(            
         
-        getJobReportResponseFromHistoryServer());                                       
         
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);                           
         
-    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))            
         
-    .thenReturn(null);                                                                  
     
-    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(             
         
-        historyServerProxy, rm);
-
-    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);                 
         
-    Assert.assertNotNull(jobStatus);
-    Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                     
         
-    Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());          
         
-    Assert.assertEquals(1.0f, jobStatus.getMapProgress());                              
         
-    Assert.assertEquals(1.0f, jobStatus.getReduceProgress());                           
         
-  }
-
-  private GetJobReportRequest getJobReportRequest() {
-    GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
-    request.setJobId(jobId);
-    return request;
-  }
-
-  private GetJobReportResponse getJobReportResponse() {
-    GetJobReportResponse jobReportResponse = Records
-        .newRecord(GetJobReportResponse.class);
-    JobReport jobReport = Records.newRecord(JobReport.class);
-    jobReport.setJobId(jobId);
-    jobReport.setJobState(JobState.SUCCEEDED);
-    jobReportResponse.setJobReport(jobReport);
-    return jobReportResponse;
-  }
-
-  private ApplicationReport getApplicationReport() {
-    ApplicationReport applicationReport = Records
-        .newRecord(ApplicationReport.class);
-    applicationReport.setYarnApplicationState(YarnApplicationState.FINISHED);
-    applicationReport.setUser("root");
-    applicationReport.setHost("N/A");
-    applicationReport.setName("N/A");
-    applicationReport.setQueue("N/A");
-    applicationReport.setStartTime(0);
-    applicationReport.setFinishTime(0);
-    applicationReport.setTrackingUrl("N/A");
-    applicationReport.setDiagnostics("N/A");
-    applicationReport.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
-    return applicationReport;
-  }
-
-  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
-    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
-    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
-    return rm;
-  }
-
-  private ClientServiceDelegate getClientServiceDelegate(
-      MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
-    Configuration conf = new YarnConfiguration();
-    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
-        conf, rm, oldJobId, historyServerProxy);
-    return clientServiceDelegate;
-  }
-
-  private GetJobReportResponse getJobReportResponseFromHistoryServer() {
-    GetJobReportResponse jobReportResponse = Records                                    
         
-        .newRecord(GetJobReportResponse.class);                                         
         
-    JobReport jobReport = Records.newRecord(JobReport.class);                           
         
-    jobReport.setJobId(jobId);                                                          
         
-    jobReport.setJobState(JobState.SUCCEEDED);                                          
         
-    jobReport.setMapProgress(1.0f);
-    jobReport.setReduceProgress(1.0f);
-    jobReport.setJobFile("TestJobFilePath");
-    jobReport.setTrackingUrl("TestTrackingUrl");
-    jobReportResponse.setJobReport(jobReport);
-    return jobReportResponse;
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Test;
+
+/**
+ * Tests for ClientServiceDelegate.java
+ */
+
+public class TestClientServiceDelegate {
+  private JobID oldJobId = JobID.forName("job_1315895242400_2");
+  private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
+      .toYarn(oldJobId);
+
+  @Test
+  public void testUnknownAppInRM() throws Exception {
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(
+        getJobReportResponse());
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, getRMDelegate());
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+  }
+
+  @Test
+  public void testRemoteExceptionFromHistoryServer() throws Exception {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+        RPCUtil.getRemoteException("Job ID doesnot Exist"));
+
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+        .thenReturn(null);
+
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, rm);
+
+    try {
+      clientServiceDelegate.getJobStatus(oldJobId);
+      Assert.fail("Invoke should throw exception after retries.");
+    } catch (YarnRemoteException e) {
+      Assert.assertEquals("Job ID doesnot Exist", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRetriesOnConnectionFailure() throws Exception {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
+        new RuntimeException("1")).thenThrow(new RuntimeException("2"))
+        .thenThrow(new RuntimeException("3"))
+        .thenReturn(getJobReportResponse());
+
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
+        .thenReturn(null);
+
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        historyServerProxy, rm);
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    verify(historyServerProxy, times(4)).getJobReport(
+        any(GetJobReportRequest.class));
+  }
+
+  @Test
+  public void testHistoryServerNotConfigured() throws Exception {
+    //RM doesn't have app report and job History Server is not configured
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
+        null, getRMDelegate());
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertEquals("N/A", jobStatus.getUsername());
+    Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState());
+
+    //RM has app report and job History Server is not configured
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    ApplicationReport applicationReport = getFinishedApplicationReport();
+    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(
+        applicationReport);
+
+    clientServiceDelegate = getClientServiceDelegate(null, rm);
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername());
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState());
+  }
+  
+  @Test
+  public void testJobReportFromHistoryServer() throws Exception {                       
         
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);                 
         
+    when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn(            
         
+        getJobReportResponseFromHistoryServer());                                       
         
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);                           
         
+    when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))            
         
+    .thenReturn(null);                                                                  
     
+    ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(             
         
+        historyServerProxy, rm);
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);                 
         
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile());                     
         
+    Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl());          
         
+    Assert.assertEquals(1.0f, jobStatus.getMapProgress());                              
         
+    Assert.assertEquals(1.0f, jobStatus.getReduceProgress());                           
         
+  }
+
+  @Test
+  public void testReconnectOnAMRestart() throws IOException {
+
+    MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
+
+    // RM returns AM1 url, null, null and AM2 url on invocations.
+    // Nulls simulate the time when AM2 is in the process of restarting.
+    ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+    when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
+        getRunningApplicationReport("am1", 78)).thenReturn(
+        getRunningApplicationReport(null, 0)).thenReturn(
+        getRunningApplicationReport(null, 0)).thenReturn(
+        getRunningApplicationReport("am2", 90));
+
+    GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
+    when(jobReportResponse1.getJobReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user",
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+
+    // First AM returns a report with jobName firstGen and simulates AM shutdown
+    // on second invocation.
+    MRClientProtocol firstGenAMProxy = mock(MRClientProtocol.class);
+    when(firstGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(jobReportResponse1).thenThrow(
+            new RuntimeException("AM is down!"));
+
+    GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class);
+    when(jobReportResponse2.getJobReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user",
+            JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything"));
+
+    // Second AM generation returns a report with jobName secondGen
+    MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class);
+    when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class)))
+        .thenReturn(jobReportResponse2);
+
+    ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
+        historyServerProxy, rmDelegate));
+    // First time, connection should be to AM1, then to AM2. Further requests
+    // should use the same proxy to AM2 and so instantiateProxy shouldn't be
+    // called.
+    doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when(
+        clientServiceDelegate).instantiateAMProxy(any(String.class));
+
+    JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-firstGen", jobStatus.getJobName());
+
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+    jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
+    Assert.assertNotNull(jobStatus);
+    Assert.assertEquals("jobName-secondGen", jobStatus.getJobName());
+
+    verify(clientServiceDelegate, times(2)).instantiateAMProxy(
+        any(String.class));
+  }
+
+  private GetJobReportRequest getJobReportRequest() {
+    GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class);
+    request.setJobId(jobId);
+    return request;
+  }
+
+  private GetJobReportResponse getJobReportResponse() {
+    GetJobReportResponse jobReportResponse = Records
+        .newRecord(GetJobReportResponse.class);
+    JobReport jobReport = Records.newRecord(JobReport.class);
+    jobReport.setJobId(jobId);
+    jobReport.setJobState(JobState.SUCCEEDED);
+    jobReportResponse.setJobReport(jobReport);
+    return jobReportResponse;
+  }
+
+  private ApplicationReport getFinishedApplicationReport() {
+    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+        1234, 5), "user", "queue", "appname", "host", 124, null,
+        YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0,
+        FinalApplicationStatus.SUCCEEDED, null);
+  }
+
+  private ApplicationReport getRunningApplicationReport(String host, int port) {
+    return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
+        1234, 5), "user", "queue", "appname", host, port, null,
+        YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0,
+        FinalApplicationStatus.UNDEFINED, null);
+  }
+
+  private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException {
+    ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
+    when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
+    return rm;
+  }
+
+  private ClientServiceDelegate getClientServiceDelegate(
+      MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
+    Configuration conf = new YarnConfiguration();
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
+        conf, rm, oldJobId, historyServerProxy);
+    return clientServiceDelegate;
+  }
+
+  private GetJobReportResponse getJobReportResponseFromHistoryServer() {
+    GetJobReportResponse jobReportResponse = Records                                    
         
+        .newRecord(GetJobReportResponse.class);                                         
         
+    JobReport jobReport = Records.newRecord(JobReport.class);                           
         
+    jobReport.setJobId(jobId);                                                          
         
+    jobReport.setJobState(JobState.SUCCEEDED);                                          
         
+    jobReport.setMapProgress(1.0f);
+    jobReport.setReduceProgress(1.0f);
+    jobReport.setJobFile("TestJobFilePath");
+    jobReport.setTrackingUrl("TestTrackingUrl");
+    jobReportResponse.setJobReport(jobReport);
+    return jobReportResponse;
+  }
+}



Mime
View raw message