Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A01F47755 for ; Wed, 26 Oct 2011 04:36:06 +0000 (UTC) Received: (qmail 55595 invoked by uid 500); 26 Oct 2011 04:36:06 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 55553 invoked by uid 500); 26 Oct 2011 04:36:05 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 55545 invoked by uid 99); 26 Oct 2011 04:36:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2011 04:36:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2011 04:36:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 367BA23888E4; Wed, 26 Oct 2011 04:35:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1189024 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/t... Date: Wed, 26 Oct 2011 04:35:40 -0000 To: mapreduce-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111026043541.367BA23888E4@eris.apache.org> Author: mahadev Date: Wed Oct 26 04:35:40 2011 New Revision: 1189024 URL: http://svn.apache.org/viewvc?rev=1189024&view=rev Log: MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM and prints a lots of logs. (vinodkv via mahadev) - Merging r1189023 from trunk. Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1189024&r1=1189023&r2=1189024&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Oct 26 04:35:40 2011 @@ -1715,6 +1715,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-3269. Fixed log4j properties to correctly set logging options for JobHistoryServer vis-a-vis JobSummary logs. (mahadev via acmurthy) + MAPREDUCE-3250. When AM restarts, client keeps reconnecting to the new AM + and prints a lots of logs. (vinodkv via mahadev) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1189024&r1=1189023&r2=1189024&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Wed Oct 26 04:35:40 2011 @@ -81,7 +81,6 @@ public class ClientServiceDelegate { private final ApplicationId appId; private final ResourceMgrDelegate rm; private final MRClientProtocol historyServerProxy; - private boolean forceRefresh; private MRClientProtocol realProxy = null; private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static String UNKNOWN_USER = "Unknown User"; @@ -122,7 +121,7 @@ public class ClientServiceDelegate { } private MRClientProtocol getProxy() throws YarnRemoteException { - if (!forceRefresh && realProxy != null) { + if (realProxy != null) { return realProxy; } @@ -133,7 +132,9 @@ public class ClientServiceDelegate { trackingUrl = application.getTrackingUrl(); } String serviceAddr = null; - while (application == null || YarnApplicationState.RUNNING == application.getYarnApplicationState()) { + while (application == null + || YarnApplicationState.RUNNING == application + .getYarnApplicationState()) { if (application == null) { LOG.info("Could not get Job info from RM for job " + jobId + ". Redirecting to job history server."); @@ -163,7 +164,7 @@ public class ClientServiceDelegate { } LOG.info("Tracking Url of JOB is " + application.getTrackingUrl()); LOG.info("Connecting to " + serviceAddr); - instantiateAMProxy(serviceAddr); + realProxy = instantiateAMProxy(serviceAddr); return realProxy; } catch (IOException e) { //possibly the AM has crashed @@ -233,10 +234,12 @@ public class ClientServiceDelegate { return historyServerProxy; } - private void instantiateAMProxy(final String serviceAddr) throws IOException { + MRClientProtocol instantiateAMProxy(final String serviceAddr) + throws IOException { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr); - realProxy = currentUser.doAs(new PrivilegedAction() { + MRClientProtocol proxy = currentUser + .doAs(new PrivilegedAction() { @Override public MRClientProtocol run() { YarnRPC rpc = YarnRPC.create(conf); @@ -245,6 +248,7 @@ public class ClientServiceDelegate { } }); LOG.trace("Connected to ApplicationMaster at: " + serviceAddr); + return proxy; } private synchronized Object invoke(String method, Class argClass, @@ -274,12 +278,14 @@ public class ClientServiceDelegate { " retrying.."); LOG.debug("Failed exception on AM/History contact", e.getTargetException()); - forceRefresh = true; + // Force reconnection by setting the proxy to null. + realProxy = null; } catch (Exception e) { LOG.info("Failed to contact AM/History for job " + jobId + " Will retry.."); LOG.debug("Failing to contact application master", e); - forceRefresh = true; + // Force reconnection by setting the proxy to null. + realProxy = null; } } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1189024&r1=1189023&r2=1189024&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Wed Oct 26 04:35:40 2011 @@ -68,8 +68,6 @@ import org.apache.hadoop.metrics2.lib.De import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -84,6 +82,8 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -123,20 +123,24 @@ public class TestClientRedirect { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); conf.set(YarnConfiguration.RM_ADDRESS, RMADDRESS); conf.set(JHAdminConfig.MR_HISTORY_ADDRESS, HSHOSTADDRESS); + + // Start the RM. RMService rmService = new RMService("test"); rmService.init(conf); rmService.start(); + // Start the AM. AMService amService = new AMService(); amService.init(conf); amService.start(conf); - amRunning = true; + // Start the HS. HistoryService historyService = new HistoryService(); historyService.init(conf); historyService.start(conf); LOG.info("services started"); + Cluster cluster = new Cluster(conf); org.apache.hadoop.mapreduce.JobID jobID = new org.apache.hadoop.mapred.JobID("201103121733", 1); @@ -151,13 +155,13 @@ public class TestClientRedirect { //bring down the AM service amService.stop(); - amRunning = false; LOG.info("Sleeping for 5 seconds after stop for" + " the server to exit cleanly.."); Thread.sleep(5000); amRestarting = true; + // Same client //results are returned from fake (not started job) counters = cluster.getJob(jobID).getCounters(); @@ -181,14 +185,15 @@ public class TestClientRedirect { amService = new AMService(); amService.init(conf); amService.start(conf); - amRunning = true; amContact = false; //reset counters = cluster.getJob(jobID).getCounters(); validateCounters(counters); Assert.assertTrue(amContact); - amRunning = false; + // Stop the AM. It is not even restarting. So it should be treated as + // completed. + amService.stop(); // Same client counters = cluster.getJob(jobID).getCounters(); @@ -347,6 +352,7 @@ public class TestClientRedirect { private InetSocketAddress bindAddress; private Server server; private final String hostAddress; + public AMService() { this(AMHOSTADDRESS); } @@ -376,11 +382,13 @@ public class TestClientRedirect { NetUtils.createSocketAddr(hostNameResolved.getHostAddress() + ":" + server.getPort()); super.start(); + amRunning = true; } public void stop() { server.stop(); super.stop(); + amRunning = false; } @Override Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java?rev=1189024&r1=1189023&r2=1189024&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java Wed Oct 26 04:35:40 2011 @@ -1,208 +1,272 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import junit.framework.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; -import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; -import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse; -import org.apache.hadoop.mapreduce.v2.api.records.JobReport; -import org.apache.hadoop.mapreduce.v2.api.records.JobState; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.ipc.RPCUtil; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Test; - -/** - * Tests for ClientServiceDelegate.java - */ - -public class TestClientServiceDelegate { - private JobID oldJobId = JobID.forName("job_1315895242400_2"); - private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter - .toYarn(oldJobId); - - @Test - public void testUnknownAppInRM() throws Exception { - MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); - when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( - getJobReportResponse()); - ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( - historyServerProxy, getRMDelegate()); - - JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertNotNull(jobStatus); - } - - @Test - public void testRemoteExceptionFromHistoryServer() throws Exception { - - MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); - when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( - RPCUtil.getRemoteException("Job ID doesnot Exist")); - - ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); - when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) - .thenReturn(null); - - ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( - historyServerProxy, rm); - - try { - clientServiceDelegate.getJobStatus(oldJobId); - Assert.fail("Invoke should throw exception after retries."); - } catch (YarnRemoteException e) { - Assert.assertEquals("Job ID doesnot Exist", e.getMessage()); - } - } - - @Test - public void testRetriesOnConnectionFailure() throws Exception { - - MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); - when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( - new RuntimeException("1")).thenThrow(new RuntimeException("2")) - .thenThrow(new RuntimeException("3")) - .thenReturn(getJobReportResponse()); - - ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); - when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) - .thenReturn(null); - - ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( - historyServerProxy, rm); - - JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertNotNull(jobStatus); - } - - @Test - public void testHistoryServerNotConfigured() throws Exception { - //RM doesn't have app report and job History Server is not configured - ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( - null, getRMDelegate()); - JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertEquals("N/A", jobStatus.getUsername()); - Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState()); - - //RM has app report and job History Server is not configured - ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); - ApplicationReport applicationReport = getApplicationReport(); - when(rm.getApplicationReport(jobId.getAppId())).thenReturn( - applicationReport); - - clientServiceDelegate = getClientServiceDelegate(null, rm); - jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername()); - Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState()); - } - - - @Test - public void testJobReportFromHistoryServer() throws Exception { - MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); - when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( - getJobReportResponseFromHistoryServer()); - ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); - when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) - .thenReturn(null); - ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( - historyServerProxy, rm); - - JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); - Assert.assertNotNull(jobStatus); - Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile()); - Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl()); - Assert.assertEquals(1.0f, jobStatus.getMapProgress()); - Assert.assertEquals(1.0f, jobStatus.getReduceProgress()); - } - - private GetJobReportRequest getJobReportRequest() { - GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); - request.setJobId(jobId); - return request; - } - - private GetJobReportResponse getJobReportResponse() { - GetJobReportResponse jobReportResponse = Records - .newRecord(GetJobReportResponse.class); - JobReport jobReport = Records.newRecord(JobReport.class); - jobReport.setJobId(jobId); - jobReport.setJobState(JobState.SUCCEEDED); - jobReportResponse.setJobReport(jobReport); - return jobReportResponse; - } - - private ApplicationReport getApplicationReport() { - ApplicationReport applicationReport = Records - .newRecord(ApplicationReport.class); - applicationReport.setYarnApplicationState(YarnApplicationState.FINISHED); - applicationReport.setUser("root"); - applicationReport.setHost("N/A"); - applicationReport.setName("N/A"); - applicationReport.setQueue("N/A"); - applicationReport.setStartTime(0); - applicationReport.setFinishTime(0); - applicationReport.setTrackingUrl("N/A"); - applicationReport.setDiagnostics("N/A"); - applicationReport.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); - return applicationReport; - } - - private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException { - ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); - when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null); - return rm; - } - - private ClientServiceDelegate getClientServiceDelegate( - MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) { - Configuration conf = new YarnConfiguration(); - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate( - conf, rm, oldJobId, historyServerProxy); - return clientServiceDelegate; - } - - private GetJobReportResponse getJobReportResponseFromHistoryServer() { - GetJobReportResponse jobReportResponse = Records - .newRecord(GetJobReportResponse.class); - JobReport jobReport = Records.newRecord(JobReport.class); - jobReport.setJobId(jobId); - jobReport.setJobState(JobState.SUCCEEDED); - jobReport.setMapProgress(1.0f); - jobReport.setReduceProgress(1.0f); - jobReport.setJobFile("TestJobFilePath"); - jobReport.setTrackingUrl("TestTrackingUrl"); - jobReportResponse.setJobReport(jobReport); - return jobReportResponse; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse; +import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Test; + +/** + * Tests for ClientServiceDelegate.java + */ + +public class TestClientServiceDelegate { + private JobID oldJobId = JobID.forName("job_1315895242400_2"); + private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter + .toYarn(oldJobId); + + @Test + public void testUnknownAppInRM() throws Exception { + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( + getJobReportResponse()); + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, getRMDelegate()); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + } + + @Test + public void testRemoteExceptionFromHistoryServer() throws Exception { + + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( + RPCUtil.getRemoteException("Job ID doesnot Exist")); + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + try { + clientServiceDelegate.getJobStatus(oldJobId); + Assert.fail("Invoke should throw exception after retries."); + } catch (YarnRemoteException e) { + Assert.assertEquals("Job ID doesnot Exist", e.getMessage()); + } + } + + @Test + public void testRetriesOnConnectionFailure() throws Exception { + + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow( + new RuntimeException("1")).thenThrow(new RuntimeException("2")) + .thenThrow(new RuntimeException("3")) + .thenReturn(getJobReportResponse()); + + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + verify(historyServerProxy, times(4)).getJobReport( + any(GetJobReportRequest.class)); + } + + @Test + public void testHistoryServerNotConfigured() throws Exception { + //RM doesn't have app report and job History Server is not configured + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + null, getRMDelegate()); + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertEquals("N/A", jobStatus.getUsername()); + Assert.assertEquals(JobStatus.State.PREP, jobStatus.getState()); + + //RM has app report and job History Server is not configured + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + ApplicationReport applicationReport = getFinishedApplicationReport(); + when(rm.getApplicationReport(jobId.getAppId())).thenReturn( + applicationReport); + + clientServiceDelegate = getClientServiceDelegate(null, rm); + jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertEquals(applicationReport.getUser(), jobStatus.getUsername()); + Assert.assertEquals(JobStatus.State.SUCCEEDED, jobStatus.getState()); + } + + @Test + public void testJobReportFromHistoryServer() throws Exception { + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getJobReport(getJobReportRequest())).thenReturn( + getJobReportResponseFromHistoryServer()); + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + Assert.assertEquals("TestJobFilePath", jobStatus.getJobFile()); + Assert.assertEquals("http://TestTrackingUrl", jobStatus.getTrackingUrl()); + Assert.assertEquals(1.0f, jobStatus.getMapProgress()); + Assert.assertEquals(1.0f, jobStatus.getReduceProgress()); + } + + @Test + public void testReconnectOnAMRestart() throws IOException { + + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + + // RM returns AM1 url, null, null and AM2 url on invocations. + // Nulls simulate the time when AM2 is in the process of restarting. + ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); + when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn( + getRunningApplicationReport("am1", 78)).thenReturn( + getRunningApplicationReport(null, 0)).thenReturn( + getRunningApplicationReport(null, 0)).thenReturn( + getRunningApplicationReport("am2", 90)); + + GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class); + when(jobReportResponse1.getJobReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "jobName-firstGen", "user", + JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything")); + + // First AM returns a report with jobName firstGen and simulates AM shutdown + // on second invocation. + MRClientProtocol firstGenAMProxy = mock(MRClientProtocol.class); + when(firstGenAMProxy.getJobReport(any(GetJobReportRequest.class))) + .thenReturn(jobReportResponse1).thenThrow( + new RuntimeException("AM is down!")); + + GetJobReportResponse jobReportResponse2 = mock(GetJobReportResponse.class); + when(jobReportResponse2.getJobReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "jobName-secondGen", "user", + JobState.RUNNING, 0, 0, 0, 0, 0, 0, "anything")); + + // Second AM generation returns a report with jobName secondGen + MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class); + when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class))) + .thenReturn(jobReportResponse2); + + ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate( + historyServerProxy, rmDelegate)); + // First time, connection should be to AM1, then to AM2. Further requests + // should use the same proxy to AM2 and so instantiateProxy shouldn't be + // called. + doReturn(firstGenAMProxy).doReturn(secondGenAMProxy).when( + clientServiceDelegate).instantiateAMProxy(any(String.class)); + + JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + Assert.assertEquals("jobName-firstGen", jobStatus.getJobName()); + + jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + Assert.assertEquals("jobName-secondGen", jobStatus.getJobName()); + + jobStatus = clientServiceDelegate.getJobStatus(oldJobId); + Assert.assertNotNull(jobStatus); + Assert.assertEquals("jobName-secondGen", jobStatus.getJobName()); + + verify(clientServiceDelegate, times(2)).instantiateAMProxy( + any(String.class)); + } + + private GetJobReportRequest getJobReportRequest() { + GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); + request.setJobId(jobId); + return request; + } + + private GetJobReportResponse getJobReportResponse() { + GetJobReportResponse jobReportResponse = Records + .newRecord(GetJobReportResponse.class); + JobReport jobReport = Records.newRecord(JobReport.class); + jobReport.setJobId(jobId); + jobReport.setJobState(JobState.SUCCEEDED); + jobReportResponse.setJobReport(jobReport); + return jobReportResponse; + } + + private ApplicationReport getFinishedApplicationReport() { + return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId( + 1234, 5), "user", "queue", "appname", "host", 124, null, + YarnApplicationState.FINISHED, "diagnostics", "url", 0, 0, + FinalApplicationStatus.SUCCEEDED, null); + } + + private ApplicationReport getRunningApplicationReport(String host, int port) { + return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId( + 1234, 5), "user", "queue", "appname", host, port, null, + YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, + FinalApplicationStatus.UNDEFINED, null); + } + + private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException { + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null); + return rm; + } + + private ClientServiceDelegate getClientServiceDelegate( + MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) { + Configuration conf = new YarnConfiguration(); + conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate( + conf, rm, oldJobId, historyServerProxy); + return clientServiceDelegate; + } + + private GetJobReportResponse getJobReportResponseFromHistoryServer() { + GetJobReportResponse jobReportResponse = Records + .newRecord(GetJobReportResponse.class); + JobReport jobReport = Records.newRecord(JobReport.class); + jobReport.setJobId(jobId); + jobReport.setJobState(JobState.SUCCEEDED); + jobReport.setMapProgress(1.0f); + jobReport.setReduceProgress(1.0f); + jobReport.setJobFile("TestJobFilePath"); + jobReport.setTrackingUrl("TestTrackingUrl"); + jobReportResponse.setJobReport(jobReport); + return jobReportResponse; + } +}