Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 438D2D523 for ; Tue, 24 Jul 2012 17:34:04 +0000 (UTC) Received: (qmail 3500 invoked by uid 500); 24 Jul 2012 17:34:04 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 3434 invoked by uid 500); 24 Jul 2012 17:34:03 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 3422 invoked by uid 99); 24 Jul 2012 17:34:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2012 17:34:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jul 2012 17:34:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 505B3238890B; Tue, 24 Jul 2012 17:33:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1365185 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-applications/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distribut... Date: Tue, 24 Jul 2012 17:33:16 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120724173317.505B3238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: acmurthy Date: Tue Jul 24 17:33:15 2012 New Revision: 1365185 URL: http://svn.apache.org/viewvc?rev=1365185&view=rev Log: MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. Contributed by Bikas Saha. Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1365185&r1=1365184&r2=1365185&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jul 24 17:33:15 2012 @@ -161,6 +161,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu) + MAPREDUCE-4438. Add a simple, generic client to run 'easy' AMs in YARN. + (Bikas Saha via acmurthy) + IMPROVEMENTS MAPREDUCE-4440. Changed SchedulerApp and SchedulerNode to be a minimal Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1365185&r1=1365184&r2=1365185&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jul 24 17:33:15 2012 @@ -290,7 +290,10 @@ public class ApplicationMaster { Map envs = System.getenv(); appAttemptID = Records.newRecord(ApplicationAttemptId.class); - if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { + if (envs.containsKey(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)) { + appAttemptID = ConverterUtils.toApplicationAttemptId(envs + .get(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV)); + } else if (!envs.containsKey(ApplicationConstants.AM_CONTAINER_ID_ENV)) { if (cliParser.hasOption("app_attempt_id")) { String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml?rev=1365185&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/pom.xml Tue Jul 24 17:33:15 2012 @@ -0,0 +1,110 @@ + + + + + hadoop-yarn-applications + org.apache.hadoop + 3.0.0-SNAPSHOT + + 4.0.0 + org.apache.hadoop + hadoop-yarn-applications-unmanaged-am-launcher + 3.0.0-SNAPSHOT + hadoop-yarn-applications-unmanaged-am-launcher + + + + ${project.parent.parent.basedir} + + + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-server-nodemanager + test + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + + + org.apache.hadoop + hadoop-yarn-server-common + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + test + + + org.apache.hadoop + hadoop-yarn-applications-distributedshell + 3.0.0-SNAPSHOT + test + + + org.apache.hadoop + hadoop-yarn-server-tests + test-jar + test + + + + + + + maven-dependency-plugin + + + build-classpath + generate-sources + + build-classpath + + + + target/classes/yarn-apps-am-generated-classpath + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + ${java.home} + + + + + + + + Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1365185&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012 @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.unmanagedamlauncher; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ClientRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.Records; + +/** + * The UnmanagedLauncher is a simple client that launches and unmanaged AM. An + * unmanagedAM is an AM that is not launched and managed by the RM. The client + * creates a new application on the RM and negotiates a new attempt id. Then it + * waits for the RM app state to reach be YarnApplicationState.ACCEPTED after + * which it spawns the AM in another process and passes it the attempt id via + * env variable ApplicationConstants.AM_APP_ATTEMPT_ID_ENV. The AM can be in any + * language. The AM can register with the RM using the attempt id and proceed as + * normal. The client redirects app stdout and stderr to its own stdout and + * stderr and waits for the AM process to exit. Then it waits for the RM to + * report app completion. + */ +public class UnmanagedAMLauncher { + private static final Log LOG = LogFactory.getLog(UnmanagedAMLauncher.class); + + private Configuration conf; + + // RPC to communicate to RM + private YarnRPC rpc; + + // Handle to talk to the Resource Manager/Applications Manager + private ClientRMProtocol rmClient; + + // Application master specific info to register a new Application with RM/ASM + private String appName = ""; + // App master priority + private int amPriority = 0; + // Queue for App master + private String amQueue = ""; + // cmd to start AM + private String amCmd = null; + // set the classpath explicitly + private String classpath = null; + + /** + * @param args + * Command line arguments + */ + public static void main(String[] args) { + try { + UnmanagedAMLauncher client = new UnmanagedAMLauncher(); + LOG.info("Initializing Client"); + boolean doRun = client.init(args); + if (!doRun) { + System.exit(0); + } + client.run(); + } catch (Throwable t) { + LOG.fatal("Error running Client", t); + System.exit(1); + } + } + + /** + */ + public UnmanagedAMLauncher(Configuration conf) throws Exception { + // Set up RPC + this.conf = conf; + rpc = YarnRPC.create(conf); + } + + public UnmanagedAMLauncher() throws Exception { + this(new Configuration()); + } + + private void printUsage(Options opts) { + new HelpFormatter().printHelp("Client", opts); + } + + public boolean init(String[] args) throws ParseException { + + Options opts = new Options(); + opts.addOption("appname", true, + "Application Name. Default value - UnmanagedAM"); + opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("queue", true, + "RM Queue in which this application is to be submitted"); + opts.addOption("master_memory", true, + "Amount of memory in MB to be requested to run the application master"); + opts.addOption("cmd", true, "command to start unmanaged AM (required)"); + opts.addOption("classpath", true, "additional classpath"); + opts.addOption("help", false, "Print usage"); + CommandLine cliParser = new GnuParser().parse(opts, args); + + if (args.length == 0) { + printUsage(opts); + throw new IllegalArgumentException( + "No args specified for client to initialize"); + } + + if (cliParser.hasOption("help")) { + printUsage(opts); + return false; + } + + appName = cliParser.getOptionValue("appname", "UnmanagedAM"); + amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); + amQueue = cliParser.getOptionValue("queue", ""); + classpath = cliParser.getOptionValue("classpath", null); + + amCmd = cliParser.getOptionValue("cmd"); + if (amCmd == null) { + printUsage(opts); + throw new IllegalArgumentException( + "No cmd specified for application master"); + } + + return true; + } + + private void connectToRM() throws IOException { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + InetSocketAddress rmAddress = yarnConf.getSocketAddr( + YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + LOG.info("Connecting to ResourceManager at " + rmAddress); + rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, + rmAddress, conf)); + } + + private GetNewApplicationResponse getApplication() throws YarnRemoteException { + GetNewApplicationRequest request = Records + .newRecord(GetNewApplicationRequest.class); + GetNewApplicationResponse response = rmClient.getNewApplication(request); + LOG.info("Got new application id=" + response.getApplicationId()); + return response; + } + + public void launchAM(ApplicationAttemptId attemptId) throws IOException { + Map env = System.getenv(); + ArrayList envAMList = new ArrayList(); + boolean setClasspath = false; + for (Map.Entry entry : env.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if(key.equals("CLASSPATH")) { + setClasspath = true; + if(classpath != null) { + value = value + File.pathSeparator + classpath; + } + } + envAMList.add(key + "=" + value); + } + + if(!setClasspath && classpath!=null) { + envAMList.add("CLASSPATH="+classpath); + } + + envAMList.add(ApplicationConstants.AM_APP_ATTEMPT_ID_ENV + "=" + attemptId); + + String[] envAM = new String[envAMList.size()]; + Process amProc = Runtime.getRuntime().exec(amCmd, envAMList.toArray(envAM)); + + final BufferedReader errReader = + new BufferedReader(new InputStreamReader(amProc + .getErrorStream())); + final BufferedReader inReader = + new BufferedReader(new InputStreamReader(amProc + .getInputStream())); + + // read error and input streams as this would free up the buffers + // free the error stream buffer + Thread errThread = new Thread() { + @Override + public void run() { + try { + String line = errReader.readLine(); + while((line != null) && !isInterrupted()) { + System.err.println(line); + line = errReader.readLine(); + } + } catch(IOException ioe) { + LOG.warn("Error reading the error stream", ioe); + } + } + }; + Thread outThread = new Thread() { + @Override + public void run() { + try { + String line = inReader.readLine(); + while((line != null) && !isInterrupted()) { + System.out.println(line); + line = inReader.readLine(); + } + } catch(IOException ioe) { + LOG.warn("Error reading the out stream", ioe); + } + } + }; + try { + errThread.start(); + outThread.start(); + } catch (IllegalStateException ise) { } + + // wait for the process to finish and check the exit code + try { + int exitCode = amProc.waitFor(); + LOG.info("AM process exited with value: " + exitCode); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + // make sure that the error thread exits + // on Windows these threads sometimes get stuck and hang the execution + // timeout and join later after destroying the process. + errThread.join(); + outThread.join(); + errReader.close(); + inReader.close(); + } catch (InterruptedException ie) { + LOG.info("ShellExecutor: Interrupted while reading the error/out stream", + ie); + } catch (IOException ioe) { + LOG.warn("Error while closing the error/out stream", ioe); + } + amProc.destroy(); + } + + public boolean run() throws IOException { + LOG.info("Starting Client"); + + // Connect to ResourceManager + connectToRM(); + assert (rmClient != null); + + // Get a new application id + GetNewApplicationResponse newApp = getApplication(); + ApplicationId appId = newApp.getApplicationId(); + + // Create launch context for app master + LOG.info("Setting up application submission context for ASM"); + ApplicationSubmissionContext appContext = Records + .newRecord(ApplicationSubmissionContext.class); + + // set the application id + appContext.setApplicationId(appId); + // set the application name + appContext.setApplicationName(appName); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(amPriority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records + .newRecord(ContainerLaunchContext.class); + appContext.setAMContainerSpec(amContainer); + + // unmanaged AM + appContext.setUnmanagedAM(true); + LOG.info("Setting unmanaged AM"); + + // Create the request to send to the applications manager + SubmitApplicationRequest appRequest = Records + .newRecord(SubmitApplicationRequest.class); + appRequest.setApplicationSubmissionContext(appContext); + + // Submit the application to the applications manager + LOG.info("Submitting application to ASM"); + rmClient.submitApplication(appRequest); + + // Monitor the application to wait for launch state + ApplicationReport appReport = monitorApplication(appId, + EnumSet.of(YarnApplicationState.ACCEPTED)); + ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId(); + LOG.info("Launching application with id: " + attemptId); + + // launch AM + launchAM(attemptId); + + // Monitor the application for end state + appReport = monitorApplication(appId, EnumSet.of( + YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED)); + YarnApplicationState appState = appReport.getYarnApplicationState(); + FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus(); + + LOG.info("App ended with state: " + appReport.getYarnApplicationState() + + " and status: " + appStatus); + if (YarnApplicationState.FINISHED == appState + && FinalApplicationStatus.SUCCEEDED == appStatus) { + LOG.info("Application has completed successfully."); + return true; + } else { + LOG.info("Application did finished unsuccessfully." + " YarnState=" + + appState.toString() + ", FinalStatus=" + appStatus.toString()); + return false; + } + } + + /** + * Monitor the submitted application for completion. Kill application if time + * expires. + * + * @param appId + * Application Id of application to be monitored + * @return true if application completed successfully + * @throws YarnRemoteException + */ + private ApplicationReport monitorApplication(ApplicationId appId, + Set finalState) throws YarnRemoteException { + + while (true) { + + // Check app status every 1 second. + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Thread sleep in monitoring loop interrupted"); + } + + // Get application report for the appId we are interested in + GetApplicationReportRequest reportRequest = Records + .newRecord(GetApplicationReportRequest.class); + reportRequest.setApplicationId(appId); + GetApplicationReportResponse reportResponse = rmClient + .getApplicationReport(reportRequest); + ApplicationReport report = reportResponse.getApplicationReport(); + + LOG.info("Got application report from ASM for" + ", appId=" + + appId.getId() + ", appAttemptId=" + + report.getCurrentApplicationAttemptId() + ", clientToken=" + + report.getClientToken() + ", appDiagnostics=" + + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + + ", appQueue=" + report.getQueue() + ", appMasterRpcPort=" + + report.getRpcPort() + ", appStartTime=" + report.getStartTime() + + ", yarnAppState=" + report.getYarnApplicationState().toString() + + ", distributedFinalState=" + + report.getFinalApplicationStatus().toString() + ", appTrackingUrl=" + + report.getTrackingUrl() + ", appUser=" + report.getUser()); + + YarnApplicationState state = report.getYarnApplicationState(); + if (finalState.contains(state)) { + return report; + } + + } + + } + +} Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java?rev=1365185&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java Tue Jul 24 17:33:15 2012 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.unmanagedamlauncher; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.URL; + +import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestUnmanagedAMLauncher { + + private static final Log LOG = LogFactory + .getLog(TestUnmanagedAMLauncher.class); + + protected static MiniYARNCluster yarnCluster = null; + protected static Configuration conf = new Configuration(); + + @BeforeClass + public static void setup() throws InterruptedException, IOException { + LOG.info("Starting up YARN cluster"); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128); + if (yarnCluster == null) { + yarnCluster = new MiniYARNCluster( + TestUnmanagedAMLauncher.class.getName(), 1, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + URL url = Thread.currentThread().getContextClassLoader() + .getResource("yarn-site.xml"); + if (url == null) { + throw new RuntimeException( + "Could not find 'yarn-site.xml' dummy file in classpath"); + } + OutputStream os = new FileOutputStream(new File(url.getPath())); + yarnCluster.getConfig().writeXml(os); + os.close(); + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.info("setup thread sleep interrupted. message=" + e.getMessage()); + } + } + + @AfterClass + public static void tearDown() throws IOException { + if (yarnCluster != null) { + yarnCluster.stop(); + yarnCluster = null; + } + } + + private static String getTestRuntimeClasspath() { + + InputStream classpathFileStream = null; + BufferedReader reader = null; + String envClassPath = ""; + + LOG.info("Trying to generate classpath for app master from current thread's classpath"); + try { + + // Create classpath from generated classpath + // Check maven pom.xml for generated classpath info + // Works if compile time env is same as runtime. Mainly tests. + ClassLoader thisClassLoader = Thread.currentThread() + .getContextClassLoader(); + String generatedClasspathFile = "yarn-apps-am-generated-classpath"; + classpathFileStream = thisClassLoader + .getResourceAsStream(generatedClasspathFile); + if (classpathFileStream == null) { + LOG.info("Could not classpath resource from class loader"); + return envClassPath; + } + LOG.info("Readable bytes from stream=" + classpathFileStream.available()); + reader = new BufferedReader(new InputStreamReader(classpathFileStream)); + String cp = reader.readLine(); + if (cp != null) { + envClassPath += cp.trim() + File.pathSeparator; + } + // yarn-site.xml at this location contains proper config for mini cluster + URL url = thisClassLoader.getResource("yarn-site.xml"); + envClassPath += new File(url.getFile()).getParent(); + } catch (IOException e) { + LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + + e.getMessage()); + } + + try { + if (classpathFileStream != null) { + classpathFileStream.close(); + } + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.info("Failed to close class path file stream or reader. Error=" + + e.getMessage()); + } + return envClassPath; + } + + @Test + public void testDSShell() throws Exception { + String classpath = getTestRuntimeClasspath(); + String javaHome = System.getenv("JAVA_HOME"); + if (javaHome == null) { + LOG.fatal("JAVA_HOME not defined. Test not running."); + return; + } + // start dist-shell with 0 containers because container launch will fail if + // there are no dist cache resources. + String[] args = { + "--classpath", + classpath, + "--cmd", + javaHome + + "/bin/java -Xmx512m " + + "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster " + + "--container_memory 128 --num_containers 0 --priority 0 --shell_command ls" }; + + LOG.info("Initializing Launcher"); + UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration( + yarnCluster.getConfig())); + boolean initSuccess = launcher.init(args); + Assert.assertTrue(initSuccess); + LOG.info("Running Launcher"); + boolean result = launcher.run(); + + LOG.info("Launcher run completed. Result=" + result); + Assert.assertTrue(result); + + } + +} Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml?rev=1365185&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/resources/yarn-site.xml Tue Jul 24 17:33:15 2012 @@ -0,0 +1,21 @@ + + + + + + + + + Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml?rev=1365185&r1=1365184&r2=1365185&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml Tue Jul 24 17:33:15 2012 @@ -30,6 +30,7 @@ hadoop-yarn-applications-distributedshell + hadoop-yarn-applications-unmanaged-am-launcher