Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7CDD0184AB for ; Tue, 13 Oct 2015 17:52:49 +0000 (UTC) Received: (qmail 25835 invoked by uid 500); 13 Oct 2015 17:52:49 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 25766 invoked by uid 500); 13 Oct 2015 17:52:49 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 25756 invoked by uid 99); 13 Oct 2015 17:52:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 17:52:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 099B2DFBD7; Tue, 13 Oct 2015 17:52:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjlee@apache.org To: common-commits@hadoop.apache.org Date: Tue, 13 Oct 2015 17:52:49 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/50] [abbrv] hadoop git commit: MAPREDUCE-6327. Made MR AM use timeline service v2 API to write history events and counters. Contributed by Junping Du. Repository: hadoop Updated Branches: refs/heads/YARN-2928 5a3db963b -> bd5af9c5f (forced update) http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ffb56f6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index eab9026..b3ea26e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -18,22 +18,45 @@ package org.apache.hadoop.mapred; +import java.io.File; +import java.io.IOException; + +import java.util.EnumSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.EventType; import org.apache.hadoop.mapreduce.jobhistory.TestJobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Test; public class TestMRTimelineEventHandling { + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; + private static final Log LOG = + LogFactory.getLog(TestMRTimelineEventHandling.class); + @Test public void testTimelineServiceStartInMiniCluster() throws Exception { Configuration conf = new YarnConfiguration(); @@ -47,7 +70,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); @@ -88,7 +111,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); TimelineStore ts = cluster.getApplicationHistoryServer() @@ -132,6 +155,140 @@ public class TestMRTimelineEventHandling { } } } + + @Test + public void testMRNewTimelineServiceEventHandling() throws Exception { + LOG.info("testMRNewTimelineServiceEventHandling start."); + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true); + + // enable new timeline serivce in MR side + conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_NEW_TIMELINE_SERVICE_ENABLED, true); + + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME + + ".class", PerNodeTimelineCollectorsAuxService.class.getName()); + + conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true); + + MiniMRYarnCluster cluster = null; + try { + cluster = new MiniMRYarnCluster( + TestMRTimelineEventHandling.class.getSimpleName(), 1, true); + cluster.init(conf); + cluster.start(); + LOG.info("A MiniMRYarnCluster get start."); + + Path inDir = new Path("input"); + Path outDir = new Path("output"); + LOG.info("Run 1st job which should be successful."); + RunningJob job = + UtilsForTests.runJobSucceed(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.SUCCEEDED, + job.getJobStatus().getState().getValue()); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(new Configuration(cluster.getConfig())); + yarnClient.start(); + EnumSet appStates = + EnumSet.allOf(YarnApplicationState.class); + + ApplicationId firstAppId = null; + List apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 1); + ApplicationReport appReport = apps.get(0); + firstAppId = appReport.getApplicationId(); + + checkNewTimelineEvent(firstAppId); + + LOG.info("Run 2nd job which should be failed."); + job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir); + Assert.assertEquals(JobStatus.FAILED, + job.getJobStatus().getState().getValue()); + + apps = yarnClient.getApplications(appStates); + Assert.assertEquals(apps.size(), 2); + + ApplicationId secAppId = null; + secAppId = apps.get(0).getApplicationId() == firstAppId ? + apps.get(1).getApplicationId() : apps.get(0).getApplicationId(); + checkNewTimelineEvent(firstAppId); + + } finally { + if (cluster != null) { + cluster.stop(); + } + // Cleanup test file + String testRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + File testRootFolder = new File(testRoot); + if(testRootFolder.isDirectory()) { + FileUtils.deleteDirectory(testRootFolder); + } + + } + } + + private void checkNewTimelineEvent(ApplicationId appId) throws IOException { + String tmpRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "/entities/"; + + File tmpRootFolder = new File(tmpRoot); + + Assert.assertTrue(tmpRootFolder.isDirectory()); + String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + "/" + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + + "/1/1/" + appId.toString(); + // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs + String outputDirJob = basePath + "/MAPREDUCE_JOB/"; + + File entityFolder = new File(outputDirJob); + Assert.assertTrue("Job output directory: " + outputDirJob + " is not exist.", + entityFolder.isDirectory()); + + // check for job event file + String jobEventFileName = appId.toString().replaceAll("application", "job") + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String jobEventFilePath = outputDirJob + jobEventFileName; + File jobEventFile = new File(jobEventFilePath); + Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not exist.", + jobEventFile.exists()); + + // check for task event file + String outputDirTask = basePath + "/MAPREDUCE_TASK/"; + File taskFolder = new File(outputDirTask); + Assert.assertTrue("Task output directory: " + outputDirTask + " is not exist.", + taskFolder.isDirectory()); + + String taskEventFileName = appId.toString().replaceAll("application", "task") + + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskEventFilePath = outputDirTask + taskEventFileName; + File taskEventFile = new File(taskEventFilePath); + Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not exist.", + taskEventFile.exists()); + + // check for task attempt event file + String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/"; + File taskAttemptFolder = new File(outputDirTaskAttempt); + Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt + + " is not exist.", taskAttemptFolder.isDirectory()); + + String taskAttemptEventFileName = appId.toString().replaceAll( + "application", "attempt") + "_m_000000_0" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + String taskAttemptEventFilePath = outputDirTaskAttempt + + taskAttemptEventFileName; + File taskAttemptEventFile = new File(taskAttemptEventFilePath); + Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + + " is not exist.", taskAttemptEventFile.exists()); + } @Test public void testMapreduceJobTimelineServiceEnabled() @@ -142,7 +299,7 @@ public class TestMRTimelineEventHandling { MiniMRYarnCluster cluster = null; try { cluster = new MiniMRYarnCluster( - TestJobHistoryEventHandler.class.getSimpleName(), 1); + TestMRTimelineEventHandling.class.getSimpleName(), 1); cluster.init(conf); cluster.start(); TimelineStore ts = cluster.getApplicationHistoryServer() http://git-wip-us.apache.org/repos/asf/hadoop/blob/0ffb56f6/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java index 47b38a1..90732eb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java @@ -66,6 +66,7 @@ public class MiniMRYarnCluster extends MiniYARNCluster { private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class); private JobHistoryServer historyServer; private JobHistoryServerWrapper historyServerWrapper; + private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector"; public MiniMRYarnCluster(String testName) { this(testName, 1); @@ -167,8 +168,24 @@ public class MiniMRYarnCluster extends MiniYARNCluster { conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of // which shuffle doesn't happen //configure the shuffle service in NM - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, - new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + String[] nmAuxServices = conf.getStrings(YarnConfiguration.NM_AUX_SERVICES); + // if need to enable TIMELINE_AUX_SERVICE_NAME + boolean enableTimelineAuxService = false; + if (nmAuxServices != null) { + for (String nmAuxService: nmAuxServices) { + if (nmAuxService == TIMELINE_AUX_SERVICE_NAME) { + enableTimelineAuxService = true; + break; + } + } + } + if (enableTimelineAuxService) { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, TIMELINE_AUX_SERVICE_NAME }); + } else { + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + } conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, Service.class);