Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00E8D182A9 for ; Fri, 14 Aug 2015 18:42:32 +0000 (UTC) Received: (qmail 90188 invoked by uid 500); 14 Aug 2015 18:42:14 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 89719 invoked by uid 500); 14 Aug 2015 18:42:13 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 88251 invoked by uid 99); 14 Aug 2015 18:42:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Aug 2015 18:42:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E9120E08EA; Fri, 14 Aug 2015 18:42:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vinodkv@apache.org To: common-commits@hadoop.apache.org Date: Fri, 14 Aug 2015 18:42:30 -0000 Message-Id: <25fbdb007c994e31ad264e73e9e84920@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [19/50] [abbrv] hadoop git commit: YARN-3040. Make putEntities operation be aware of the app's context. Contributed by Zhijie Shen YARN-3040. Make putEntities operation be aware of the app's context. Contributed by Zhijie Shen (cherry picked from commit db2f0238915d6e1a5b85c463426b5e072bd4698d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/60203f25 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/60203f25 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/60203f25 Branch: refs/heads/YARN-2928 Commit: 60203f256117ea6e7d2f878cfb7f3a927ccd2723 Parents: ff57048 Author: Junping Du Authored: Thu Mar 26 09:59:32 2015 -0700 Committer: Vinod Kumar Vavilapalli Committed: Fri Aug 14 11:23:23 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 1 + .../applications/distributedshell/Client.java | 27 +++- .../distributedshell/TestDistributedShell.java | 125 +++++++++------- .../yarn/util/timeline/TimelineUtils.java | 16 +++ .../api/CollectorNodemanagerProtocol.java | 16 +++ ...ollectorNodemanagerProtocolPBClientImpl.java | 20 +++ ...llectorNodemanagerProtocolPBServiceImpl.java | 21 +++ .../GetTimelineCollectorContextRequest.java | 37 +++++ .../GetTimelineCollectorContextResponse.java | 46 ++++++ ...etTimelineCollectorContextRequestPBImpl.java | 127 +++++++++++++++++ ...tTimelineCollectorContextResponsePBImpl.java | 141 +++++++++++++++++++ .../proto/collectornodemanager_protocol.proto | 1 + .../yarn_server_common_service_protos.proto | 9 ++ .../java/org/apache/hadoop/yarn/TestRPC.java | 39 +++++ .../collectormanager/NMCollectorService.java | 18 ++- .../containermanager/ContainerManagerImpl.java | 14 +- .../application/Application.java | 4 + .../application/ApplicationImpl.java | 17 ++- .../application/TestApplication.java | 3 +- .../yarn/server/nodemanager/webapp/MockApp.java | 10 ++ .../nodemanager/webapp/TestNMWebServices.java | 2 +- .../resourcemanager/amlauncher/AMLauncher.java | 23 ++- .../timelineservice/RMTimelineCollector.java | 7 + .../TestTimelineServiceClientIntegration.java | 19 ++- .../collector/AppLevelTimelineCollector.java | 33 ++++- .../PerNodeTimelineCollectorsAuxService.java | 2 +- .../collector/TimelineCollector.java | 19 ++- .../collector/TimelineCollectorContext.java | 81 +++++++++++ .../collector/TimelineCollectorManager.java | 32 ++++- .../collector/TimelineCollectorWebService.java | 2 +- .../storage/FileSystemTimelineWriterImpl.java | 69 +++++---- .../timelineservice/storage/TimelineWriter.java | 9 +- ...TestPerNodeTimelineCollectorsAuxService.java | 43 ++++-- .../collector/TestTimelineCollectorManager.java | 41 +++++- .../TestFileSystemTimelineWriterImpl.java | 22 ++- 36 files changed, 956 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 09e9ecb..4816b0d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -41,6 +41,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R via junping_du) + YARN-3040. Make putEntities operation be aware of the app's context. (Zhijie Shen + via junping_du) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 613ffd9..81c8f4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -124,6 +124,7 @@ public class YarnConfiguration extends Configuration { public static final String RM_PREFIX = "yarn.resourcemanager."; public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id"; + public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster"; public static final String RM_HOSTNAME = RM_PREFIX + "hostname"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 163f8f0..db69490 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -22,8 +22,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Vector; import org.apache.commons.cli.CommandLine; @@ -183,6 +185,9 @@ public class Client { // Timeline domain writer access control private String modifyACLs = null; + private String flowId = null; + private String flowRunId = null; + // Command line options private Options opts; @@ -256,7 +261,8 @@ public class Client { opts.addOption("shell_args", true, "Command line args for the shell script." + "Multiple args can be separated by empty space."); opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); - opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); + opts.addOption("shell_env", true, + "Environment for shell script. Specified as env_key=env_val pairs"); opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); @@ -283,6 +289,10 @@ public class Client { + "modify the timeline entities in the given domain"); opts.addOption("create", false, "Flag to indicate whether to create the " + "domain specified with -domain."); + opts.addOption("flow", true, "ID of the flow which the distributed shell " + + "app belongs to"); + opts.addOption("flow_run", true, "ID of the flowrun which the distributed " + + "shell app belongs to"); opts.addOption("help", false, "Print usage"); opts.addOption("node_label_expression", true, "Node label expression to determine the nodes" @@ -442,6 +452,12 @@ public class Client { } } + if (cliParser.hasOption("flow")) { + flowId = cliParser.getOptionValue("flow"); + } + if (cliParser.hasOption("flow_run")) { + flowRunId = cliParser.getOptionValue("flow_run"); + } return true; } @@ -533,6 +549,15 @@ public class Client { .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); } + Set tags = new HashSet(); + if (flowId != null) { + tags.add(TimelineUtils.generateFlowIdTag(flowId)); + } + if (flowRunId != null) { + tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId)); + } + appContext.setApplicationTags(tags); + // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 0af050c..1de3b68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -33,12 +33,14 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -161,20 +164,26 @@ public class TestDistributedShell { @Test(timeout=90000) public void testDSShellWithDomain() throws Exception { - testDSShell(true, "v1"); + testDSShell(true, "v1", true); } @Test(timeout=90000) public void testDSShellWithoutDomain() throws Exception { - testDSShell(false, "v1"); + testDSShell(false, "v1", true); } @Test(timeout=90000) - public void testDSShellWithoutDomainV2() throws Exception { - testDSShell(false, "v2"); + public void testDSShellWithoutDomainV2DefaultFlow() throws Exception { + testDSShell(false, "v2", true); } - public void testDSShell(boolean haveDomain, String timelineVersion) + @Test(timeout=90000) + public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception { + testDSShell(false, "v2", false); + } + + public void testDSShell(boolean haveDomain, String timelineVersion, + boolean defaultFlow) throws Exception { String[] args = { "--jar", @@ -212,6 +221,15 @@ public class TestDistributedShell { }; isTestingTimelineV2 = true; args = mergeArgs(args, timelineArgs); + if (!defaultFlow) { + String[] flowArgs = { + "--flow", + "test_flow_id", + "--flow_run", + "12345678" + }; + args = mergeArgs(args, flowArgs); + } LOG.info("Setup: Using timeline v2!"); } @@ -271,7 +289,7 @@ public class TestDistributedShell { if (!isTestingTimelineV2) { checkTimelineV1(haveDomain); } else { - checkTimelineV2(haveDomain, appId); + checkTimelineV2(haveDomain, appId, defaultFlow); } } @@ -320,53 +338,58 @@ public class TestDistributedShell { } } - private void checkTimelineV2(boolean haveDomain, ApplicationId appId) { - // For PoC check in /tmp/ YARN-3264 - String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + private void checkTimelineV2( + boolean haveDomain, ApplicationId appId, boolean defaultFlow) + throws Exception { + // For PoC check in /tmp/timeline_service_data YARN-3264 + String tmpRoot = + FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + + "/entities/"; File tmpRootFolder = new File(tmpRoot); - Assert.assertTrue(tmpRootFolder.isDirectory()); - - // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs - String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/"; - - File entityFolder = new File(outputDirApp); - Assert.assertTrue(entityFolder.isDirectory()); - - // there will be at least one attempt, look for that file - String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() - + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - String appAttemptFileName = outputDirApp + appTimestampFileName; - File appAttemptFile = new File(appAttemptFileName); - Assert.assertTrue(appAttemptFile.exists()); - - String outputDirContainer = tmpRoot + "/DS_CONTAINER/"; - File containerFolder = new File(outputDirContainer); - Assert.assertTrue(containerFolder.isDirectory()); - - String containerTimestampFileName = "container_" - + appId.getClusterTimestamp() + "_000" + appId.getId() - + "_01_000002.thist"; - String containerFileName = outputDirContainer + containerTimestampFileName; - File containerFile = new File(containerFileName); - Assert.assertTrue(containerFile.exists()); - String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() - + "_"; - deleteAppFiles(new File(outputDirApp), appTimeStamp); - deleteAppFiles(new File(outputDirContainer), appTimeStamp); - tmpRootFolder.delete(); - } - - private void deleteAppFiles(File rootDir, String appTimeStamp) { - boolean deleted = false; - File[] listOfFiles = rootDir.listFiles(); - for (File f1 : listOfFiles) { - // list all attempts for this app and delete them - if (f1.getName().contains(appTimeStamp)){ - deleted = f1.delete(); - Assert.assertTrue(deleted); - } + try { + Assert.assertTrue(tmpRootFolder.isDirectory()); + + // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs + String outputDirApp = tmpRoot + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + (defaultFlow ? "/" + + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + + "/0/" : "/test_flow_id/12345678/") + + appId.toString() + "/DS_APP_ATTEMPT/"; + + File entityFolder = new File(outputDirApp); + Assert.assertTrue(entityFolder.isDirectory()); + + // there will be at least one attempt, look for that file + String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() + + "_000" + appId.getId() + "_000001" + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + String appAttemptFileName = outputDirApp + appTimestampFileName; + File appAttemptFile = new File(appAttemptFileName); + Assert.assertTrue(appAttemptFile.exists()); + + String outputDirContainer = tmpRoot + + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + + UserGroupInformation.getCurrentUser().getShortUserName() + + (defaultFlow ? "/" + + TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) + + "/0/" : "/test_flow_id/12345678/") + + appId.toString() + "/DS_CONTAINER/"; + File containerFolder = new File(outputDirContainer); + Assert.assertTrue(containerFolder.isDirectory()); + + String containerTimestampFileName = "container_" + + appId.getClusterTimestamp() + "_000" + appId.getId() + + "_01_000002.thist"; + String containerFileName = outputDirContainer + containerTimestampFileName; + File containerFile = new File(containerFileName); + Assert.assertTrue(containerFile.exists()); + String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId() + + "_"; + } finally { + FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java index 4f838e6..ce14562 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -43,6 +44,9 @@ import org.codehaus.jackson.map.ObjectMapper; @Evolving public class TimelineUtils { + public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG"; + public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG"; + private static ObjectMapper mapper; static { @@ -119,4 +123,16 @@ public class TimelineUtils { getTimelineTokenServiceAddress(conf); return SecurityUtil.buildTokenService(timelineServiceAddr); } + + public static String generateDefaultFlowIdBasedOnAppId(ApplicationId appId) { + return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId(); + } + + public static String generateFlowIdTag(String flowId) { + return FLOW_ID_TAG_PREFIX + ":" + flowId; + } + + public static String generateFlowRunIdTag(String flowRunId) { + return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java index 26c121a..d23c04a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java @@ -21,6 +21,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; @@ -54,4 +56,18 @@ public interface CollectorNodemanagerProtocol { ReportNewCollectorInfoRequest request) throws YarnException, IOException; + /** + *

+ * The collector needs to get the context information including user, flow + * and flow run ID to associate with every incoming put-entity requests. + *

+ * @param request the request of getting the aggregator context information of + * the given application + * @return + * @throws YarnException + * @throws IOException + */ + GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java index 276a540..b9e17f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java @@ -30,11 +30,16 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; @@ -85,6 +90,21 @@ public class CollectorNodemanagerProtocolPBClientImpl implements } @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + GetTimelineCollectorContextRequestProto requestProto = + ((GetTimelineCollectorContextRequestPBImpl) request).getProto(); + try { + return new GetTimelineCollectorContextResponsePBImpl( + proxy.getTimelineCollectorContext(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override public void close() { if (this.proxy != null) { RPC.stopProxy(this.proxy); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java index 3f42732..21fb270 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java @@ -20,11 +20,16 @@ package org.apache.hadoop.yarn.server.api.impl.pb.service; import java.io.IOException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl; @@ -56,4 +61,20 @@ public class CollectorNodemanagerProtocolPBServiceImpl implements } } + @Override + public GetTimelineCollectorContextResponseProto getTimelineCollectorContext( + RpcController controller, + GetTimelineCollectorContextRequestProto proto) throws ServiceException { + GetTimelineCollectorContextRequestPBImpl request = + new GetTimelineCollectorContextRequestPBImpl(proto); + try { + GetTimelineCollectorContextResponse response = + real.getTimelineCollectorContext(request); + return ((GetTimelineCollectorContextResponsePBImpl)response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java new file mode 100644 index 0000000..604a40b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextRequest { + + public static GetTimelineCollectorContextRequest newInstance( + ApplicationId appId) { + GetTimelineCollectorContextRequest request = + Records.newRecord(GetTimelineCollectorContextRequest.class); + request.setApplicationId(appId); + return request; + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId appId); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java new file mode 100644 index 0000000..1558e2f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + + +import org.apache.hadoop.yarn.util.Records; + +public abstract class GetTimelineCollectorContextResponse { + + public static GetTimelineCollectorContextResponse newInstance( + String userId, String flowId, String flowRunId) { + GetTimelineCollectorContextResponse response = + Records.newRecord(GetTimelineCollectorContextResponse.class); + response.setUserId(userId); + response.setFlowId(flowId); + response.setFlowRunId(flowRunId); + return response; + } + + public abstract String getUserId(); + + public abstract void setUserId(String userId); + + public abstract String getFlowId(); + + public abstract void setFlowId(String flowId); + + public abstract String getFlowRunId(); + + public abstract void setFlowRunId(String flowRunId); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java new file mode 100644 index 0000000..b53b55b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; + +public class GetTimelineCollectorContextRequestPBImpl extends + GetTimelineCollectorContextRequest { + + GetTimelineCollectorContextRequestProto + proto = GetTimelineCollectorContextRequestProto.getDefaultInstance(); + GetTimelineCollectorContextRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId appId = null; + + public GetTimelineCollectorContextRequestPBImpl() { + builder = GetTimelineCollectorContextRequestProto.newBuilder(); + } + + public GetTimelineCollectorContextRequestPBImpl( + GetTimelineCollectorContextRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationId getApplicationId() { + if (this.appId != null) { + return this.appId; + } + + GetTimelineCollectorContextRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasAppId()) { + return null; + } + + this.appId = convertFromProtoFormat(p.getAppId()); + return this.appId; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) + builder.clearAppId(); + this.appId = appId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(YarnProtos.ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl)t).getProto(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java new file mode 100644 index 0000000..6dc1f77 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; + +public class GetTimelineCollectorContextResponsePBImpl extends + GetTimelineCollectorContextResponse { + + GetTimelineCollectorContextResponseProto proto = + GetTimelineCollectorContextResponseProto.getDefaultInstance(); + GetTimelineCollectorContextResponseProto.Builder builder = null; + boolean viaProto = false; + + public GetTimelineCollectorContextResponsePBImpl() { + builder = GetTimelineCollectorContextResponseProto.newBuilder(); + } + + public GetTimelineCollectorContextResponsePBImpl( + GetTimelineCollectorContextResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public GetTimelineCollectorContextResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = GetTimelineCollectorContextResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getUserId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasUserId()) { + return null; + } + return p.getUserId(); + } + + @Override + public void setUserId(String userId) { + maybeInitBuilder(); + if (userId == null) { + builder.clearUserId(); + return; + } + builder.setUserId(userId); + } + + @Override + public String getFlowId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFlowId()) { + return null; + } + return p.getFlowId(); + } + + @Override + public void setFlowId(String flowId) { + maybeInitBuilder(); + if (flowId == null) { + builder.clearFlowId(); + return; + } + builder.setFlowId(flowId); + } + + @Override + public String getFlowRunId() { + GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFlowRunId()) { + return null; + } + return p.getFlowRunId(); + } + + @Override + public void setFlowRunId(String flowRunId) { + maybeInitBuilder(); + if (flowRunId == null) { + builder.clearFlowRunId(); + return; + } + builder.setFlowRunId(flowRunId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto index 654a9f2..8665274 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto @@ -26,4 +26,5 @@ import "yarn_server_common_service_protos.proto"; service CollectorNodemanagerProtocolService { rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto); + rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d358b83..2234752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -109,6 +109,15 @@ message ReportNewCollectorInfoRequestProto { message ReportNewCollectorInfoResponseProto { } +message GetTimelineCollectorContextRequestProto { + optional ApplicationIdProto appId = 1; +} + +message GetTimelineCollectorContextResponseProto { + optional string user_id = 1; + optional string flow_id = 2; + optional string flow_run_id = 3; +} message NMContainerStatusProto { optional ContainerIdProto container_id = 1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index cfc3dc6..3c9f57b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; @@ -166,6 +168,31 @@ public class TestRPC { Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE)); } + // Verify request with a valid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 1)); + GetTimelineCollectorContextResponse response = + proxy.getTimelineCollectorContext(request); + Assert.assertEquals("test_user_id", response.getUserId()); + Assert.assertEquals("test_flow_id", response.getFlowId()); + Assert.assertEquals("test_flow_run_id", response.getFlowRunId()); + } catch (YarnException | IOException e) { + Assert.fail("RPC call failured is not expected here."); + } + + // Verify request with an invalid app ID + try { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance( + ApplicationId.newInstance(0, 2)); + proxy.getTimelineCollectorContext(request); + Assert.fail("RPC call failured is expected here."); + } catch (YarnException | IOException e) { + Assert.assertTrue(e instanceof YarnException); + Assert.assertTrue(e.getMessage().contains("The application is not found.")); + } server.stop(); } @@ -340,6 +367,18 @@ public class TestRPC { recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class); return response; } + + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + if (request.getApplicationId().getId() == 1) { + return GetTimelineCollectorContextResponse.newInstance( + "test_user_id", "test_flow_id", "test_flow_run_id"); + } else { + throw new YarnException("The application is not found."); + } + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index 009fa63..6ccea84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -30,13 +30,17 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; public class NMCollectorService extends CompositeService implements CollectorNodemanagerProtocol { @@ -93,7 +97,7 @@ public class NMCollectorService extends CompositeService implements @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( - ReportNewCollectorInfoRequest request) throws IOException { + ReportNewCollectorInfoRequest request) throws YarnException, IOException { List newCollectorsList = request.getAppCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { Map newCollectorsMap = @@ -107,4 +111,16 @@ public class NMCollectorService extends CompositeService implements return ReportNewCollectorInfoResponse.newInstance(); } + @Override + public GetTimelineCollectorContextResponse getTimelineCollectorContext( + GetTimelineCollectorContextRequest request) + throws YarnException, IOException { + Application app = context.getApplications().get(request.getApplicationId()); + if (app == null) { + throw new YarnException("Application " + request.getApplicationId() + + " doesn't exist on NM."); + } + return GetTimelineCollectorContextResponse.newInstance( + app.getUser(), app.getFlowId(), app.getFlowRunId()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 494fa8f..8a60dcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -139,6 +139,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; public class ContainerManagerImpl extends CompositeService implements ServiceStateChangeListener, ContainerManagementProtocol, @@ -297,8 +298,9 @@ public class ContainerManagerImpl extends CompositeService implements } LOG.info("Recovering application " + appId); - ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context); + //TODO: Recover flow and flow run ID + ApplicationImpl app = new ApplicationImpl( + dispatcher, p.getUser(), null, null, appId, creds, context); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @@ -868,8 +870,12 @@ public class ContainerManagerImpl extends CompositeService implements try { if (!serviceStopped) { // Create the application - Application application = - new ApplicationImpl(dispatcher, user, applicationID, credentials, context); + String flowId = launchContext.getEnvironment().get( + TimelineUtils.FLOW_ID_TAG_PREFIX); + String flowRunId = launchContext.getEnvironment().get( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); + Application application = new ApplicationImpl( + dispatcher, user, flowId, flowRunId, applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java index b1571e9..decd17d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java @@ -35,4 +35,8 @@ public interface Application extends EventHandler { ApplicationState getApplicationState(); + String getFlowId(); + + String getFlowRunId(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index a8dbb57..81b75b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -65,6 +65,8 @@ public class ApplicationImpl implements Application { final Dispatcher dispatcher; final String user; + final String flowId; + final String flowRunId; final ApplicationId appId; final Credentials credentials; Map applicationACLs; @@ -80,10 +82,13 @@ public class ApplicationImpl implements Application { Map containers = new HashMap(); - public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId, - Credentials credentials, Context context) { + public ApplicationImpl(Dispatcher dispatcher, String user, String flowId, + String flowRunId, ApplicationId appId, Credentials credentials, + Context context) { this.dispatcher = dispatcher; this.user = user; + this.flowId = flowId; + this.flowRunId = flowRunId; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); @@ -488,4 +493,12 @@ public class ApplicationImpl implements Application { this.readLock.unlock(); } } + + public String getFlowId() { + return flowId; + } + + public String getFlowRunId() { + return flowRunId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 370a207..5303df5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -530,7 +530,8 @@ public class TestApplication { this.user = user; this.appId = BuilderUtils.newApplicationId(timestamp, id); - app = new ApplicationImpl(dispatcher, this.user, appId, null, context); + app = new ApplicationImpl( + dispatcher, this.user, null, null, appId, null, context); containers = new ArrayList(); for (int i = 0; i < numContainers; i++) { Container container = createMockedContainer(this.appId, i); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java index 4e13010..35b95ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java @@ -39,6 +39,9 @@ public class MockApp implements Application { Map containers = new HashMap(); ApplicationState appState; Application app; + String flowId; + String flowRunId; + public MockApp(int uniqId) { this("mockUser", 1234, uniqId); @@ -77,4 +80,11 @@ public class MockApp implements Application { public void handle(ApplicationEvent event) {} + public String getFlowId() { + return flowId; + } + + public String getFlowRunId() { + return flowRunId; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 1f5590c..2ac9c22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -327,7 +327,7 @@ public class TestNMWebServices extends JerseyTestBase { final String filename = "logfile1"; final String logMessage = "log message\n"; nmContext.getApplications().put(appId, new ApplicationImpl(null, "user", - appId, null, nmContext)); + null, null, appId, null, nmContext)); MockContainer container = new MockContainer(appAttemptId, new AsyncDispatcher(), new Configuration(), "user", appId, 1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java index 713e75f..5c0b02b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java @@ -35,8 +35,8 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -218,6 +219,26 @@ public class AMLauncher implements Runnable { .get(applicationId) .getSubmitTime())); + // Set flow context info + for (String tag : + rmContext.getRMApps().get(applicationId).getApplicationTags()) { + if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX + ":") || + tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) { + String value = tag.substring( + TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1); + if (!value.isEmpty()) { + environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value); + } + } + if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || + tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { + String value = tag.substring( + TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1); + if (!value.isEmpty()) { + environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value); + } + } + } Credentials credentials = new Credentials(); DataInputByteBuffer dibb = new DataInputByteBuffer(); ByteBuffer tokens = container.getTokens(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java index 22743d6..4ea7a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; /** * This class is responsible for posting application and appattempt lifecycle @@ -87,6 +88,12 @@ public class RMTimelineCollector extends TimelineCollector { LOG.error("Unknown SystemMetricsEvent type: " + event.getType()); } } + + @Override + protected TimelineCollectorContext getTimelineEntityContext() { + // TODO address in YARN-3390. + return null; + } /** * EventHandler implementation which forward events to SystemMetricsPublisher. http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index fab131c..c8b9625 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -20,20 +20,27 @@ package org.apache.hadoop.yarn.server.timelineservice; import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; + public class TestTimelineServiceClientIntegration { private static TimelineCollectorManager collectorManager; private static PerNodeTimelineCollectorsAuxService auxService; @@ -86,7 +93,17 @@ public class TestTimelineServiceClientIntegration { @Override protected CollectorNodemanagerProtocol getNMCollectorService() { - return mock(CollectorNodemanagerProtocol.class); + CollectorNodemanagerProtocol protocol = + mock(CollectorNodemanagerProtocol.class); + try { + GetTimelineCollectorContextResponse response = + GetTimelineCollectorContextResponse.newInstance(null, null, null); + when(protocol.getTimelineCollectorContext(any( + GetTimelineCollectorContextRequest.class))).thenReturn(response); + } catch (YarnException | IOException e) { + fail(); + } + return protocol; } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java index 7d59876..60ddde5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java @@ -18,9 +18,14 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; /** * Service that handles writes to the timeline service and writes them to the @@ -31,16 +36,29 @@ import org.apache.hadoop.conf.Configuration; @Private @Unstable public class AppLevelTimelineCollector extends TimelineCollector { - private final String applicationId; - // TODO define key metadata such as flow metadata, user, and queue + private final ApplicationId appId; + private final TimelineCollectorContext context; - public AppLevelTimelineCollector(String applicationId) { - super(AppLevelTimelineCollector.class.getName() + " - " + applicationId); - this.applicationId = applicationId; + public AppLevelTimelineCollector(ApplicationId appId) { + super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString()); + Preconditions.checkNotNull(appId, "AppId shouldn't be null"); + this.appId = appId; + context = new TimelineCollectorContext(); } @Override protected void serviceInit(Configuration conf) throws Exception { + context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID)); + // Set the default values, which will be updated with an RPC call to get the + // context info from NM. + // Current user usually is not the app user, but keep this field non-null + context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName()); + // Use app ID to generate a default flow ID for orphan app + context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId)); + // Set the flow run ID to 0 if it's an orphan app + context.setFlowRunId("0"); + context.setAppId(appId.toString()); super.serviceInit(conf); } @@ -54,4 +72,9 @@ public class AppLevelTimelineCollector extends TimelineCollector { super.serviceStop(); } + @Override + protected TimelineCollectorContext getTimelineEntityContext() { + return context; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java index 59ecef1..2017d01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java @@ -95,7 +95,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService { */ public boolean addApplication(ApplicationId appId) { AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId.toString()); + new AppLevelTimelineCollector(appId); return (collectorManager.putIfAbsent(appId, collector) == collector); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 6e20e69..677feb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; + /** * Service that handles writes to the timeline service and writes them to the * backing storage. @@ -83,21 +84,24 @@ public abstract class TimelineCollector extends CompositeService { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. * * @param entities entities to post * @param callerUgi the caller UGI * @return the response that contains the result of the post. */ - public TimelineWriteResponse postEntities(TimelineEntities entities, + public TimelineWriteResponse putEntities(TimelineEntities entities, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + LOG.debug("putEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return writer.write(entities); + TimelineCollectorContext context = getTimelineEntityContext(); + return writer.write(context.getClusterId(), context.getUserId(), + context.getFlowId(), context.getFlowRunId(), context.getAppId(), + entities); } /** @@ -111,12 +115,15 @@ public abstract class TimelineCollector extends CompositeService { * @param entities entities to post * @param callerUgi the caller UGI */ - public void postEntitiesAsync(TimelineEntities entities, + public void putEntitiesAsync(TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { - LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" + + LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } } + + protected abstract TimelineCollectorContext getTimelineEntityContext(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java new file mode 100644 index 0000000..c1a10a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.collector; + +public class TimelineCollectorContext { + + private String clusterId; + private String userId; + private String flowId; + private String flowRunId; + private String appId; + + public TimelineCollectorContext() { + this(null, null, null, null, null); + } + + public TimelineCollectorContext(String clusterId, String userId, + String flowId, String flowRunId, String appId) { + this.clusterId = clusterId; + this.userId = userId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + } + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getFlowId() { + return flowId; + } + + public void setFlowId(String flowId) { + this.flowId = flowId; + } + + public String getFlowRunId() { + return flowRunId; + } + + public void setFlowRunId(String flowRunId) { + this.flowRunId = flowRunId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60203f25/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 3a4515e..909027e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -102,6 +104,7 @@ public class TimelineCollectorManager extends CompositeService { @Override protected void serviceStart() throws Exception { + nmCollectorService = getNMCollectorService(); startWebApp(); super.serviceStart(); } @@ -151,11 +154,11 @@ public class TimelineCollectorManager extends CompositeService { // Report to NM if a new collector is added. if (collectorIsNew) { try { + updateTimelineCollectorContext(appId, collector); reportNewCollectorToNM(appId); } catch (Exception e) { - // throw exception here as it cannot be used if failed report to NM - LOG.error("Failed to report a new collector for application: " + appId + - " to the NM Collector Service."); + // throw exception here as it cannot be used if failed communicate with NM + LOG.error("Failed to communicate with NM Collector Service for " + appId); throw new YarnRuntimeException(e); } } @@ -250,7 +253,6 @@ public class TimelineCollectorManager extends CompositeService { private void reportNewCollectorToNM(ApplicationId appId) throws YarnException, IOException { - this.nmCollectorService = getNMCollectorService(); ReportNewCollectorInfoRequest request = ReportNewCollectorInfoRequest.newInstance(appId, this.timelineRestServerBindAddress); @@ -259,6 +261,28 @@ public class TimelineCollectorManager extends CompositeService { nmCollectorService.reportNewCollectorInfo(request); } + private void updateTimelineCollectorContext( + ApplicationId appId, TimelineCollector collector) + throws YarnException, IOException { + GetTimelineCollectorContextRequest request = + GetTimelineCollectorContextRequest.newInstance(appId); + LOG.info("Get timeline collector context for " + appId); + GetTimelineCollectorContextResponse response = + nmCollectorService.getTimelineCollectorContext(request); + String userId = response.getUserId(); + if (userId != null && !userId.isEmpty()) { + collector.getTimelineEntityContext().setUserId(userId); + } + String flowId = response.getFlowId(); + if (flowId != null && !flowId.isEmpty()) { + collector.getTimelineEntityContext().setFlowId(flowId); + } + String flowRunId = response.getFlowRunId(); + if (flowRunId != null && !flowRunId.isEmpty()) { + collector.getTimelineEntityContext().setFlowRunId(flowRunId); + } + } + @VisibleForTesting protected CollectorNodemanagerProtocol getNMCollectorService() { Configuration conf = getConfig();