Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 389EB18575 for ; Thu, 16 Jul 2015 09:28:28 +0000 (UTC) Received: (qmail 3312 invoked by uid 500); 16 Jul 2015 09:28:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 3282 invoked by uid 500); 16 Jul 2015 09:28:28 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 3273 invoked by uid 99); 16 Jul 2015 09:28:28 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jul 2015 09:28:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 999AA1827F2 for ; Thu, 16 Jul 2015 09:28:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id d0eVexbS9U5w for ; Thu, 16 Jul 2015 09:28:25 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id C3F4725073 for ; Thu, 16 Jul 2015 09:28:24 +0000 (UTC) Received: (qmail 3137 invoked by uid 99); 16 Jul 2015 09:28:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jul 2015 09:28:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9891ADFF08; Thu, 16 Jul 2015 09:28:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 16 Jul 2015 09:28:24 -0000 Message-Id: <677bb3be7da44ff191c0583d0646d3ed@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/50] [abbrv] incubator-ignite git commit: #IGNITE-YARN Repository: incubator-ignite Updated Branches: refs/heads/ignite-gg-9615 43d7c8cc1 -> ebed8ee99 #IGNITE-YARN Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5691911 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5691911 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5691911 Branch: refs/heads/ignite-gg-9615 Commit: b5691911dc545db3264afcf23153e2f9ec914724 Parents: 50cfa27 Author: Tikhonov Nikolay Authored: Tue Jun 2 21:17:35 2015 +0300 Committer: Tikhonov Nikolay Committed: Tue Jun 2 21:17:35 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/yarn/ApplicationMaster.java | 120 ++++++++++--------- .../apache/ignite/yarn/IgniteYarnClient.java | 116 +++++++++++++++++- .../ignite/yarn/IgniteSchedulerSelfTest.java | 2 + 3 files changed, 179 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java index f52a1de..9ab70d4 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java @@ -17,6 +17,7 @@ package org.apache.ignite.yarn; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.*; import org.apache.hadoop.yarn.api.*; import org.apache.hadoop.yarn.api.protocolrecords.*; @@ -32,56 +33,87 @@ import java.util.*; * TODO */ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { - /** {@inheritDoc} */ - @Override public void onContainersCompleted(List statuses) { - + Configuration configuration; + NMClient nmClient; + int numContainersToWaitFor = 5; + + public ApplicationMaster() { + configuration = new YarnConfiguration(); + nmClient = NMClient.createNMClient(); + nmClient.init(configuration); + nmClient.start(); } - /** {@inheritDoc} */ - @Override public void onContainersAllocated(List containers) { + public void onContainersAllocated(List containers) { + for (Container container : containers) { + try { + // Launch container by create ContainerLaunchContext + // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov + ContainerLaunchContext ctx = + Records.newRecord(ContainerLaunchContext.class); + ctx.setCommands( + Lists.newArrayList( + "ls " + + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" + )); + System.out.println("[AM] Launching container " + container.getId()); + nmClient.startContainer(container, ctx); + } catch (Exception ex) { + System.err.println("[AM] Error launching container " + container.getId() + " " + ex); + } + } + } + public void onContainersCompleted(List statuses) { + for (ContainerStatus status : statuses) { + System.out.println("[AM] Completed container " + status.getContainerId()); + synchronized (this) { + numContainersToWaitFor--; + } + } } - /** {@inheritDoc} */ - @Override public void onShutdownRequest() { + public void onNodesUpdated(List updated) { + } + public void onReboot() { } - /** {@inheritDoc} */ - @Override public void onNodesUpdated(List updatedNodes) { + public void onShutdownRequest() { + } + public void onError(Throwable t) { } - /** {@inheritDoc} */ - @Override public float getProgress() { + public float getProgress() { return 0; } - /** {@inheritDoc} */ - @Override public void onError(Throwable e) { + public boolean doneWithContainers() { + return numContainersToWaitFor == 0; + } + public Configuration getConfiguration() { + return configuration; } - /** - * @param args Arguments. - */ public static void main(String[] args) throws Exception { - final String command = args[0]; - final int n = Integer.valueOf(args[1]); + ApplicationMaster master = new ApplicationMaster(); + master.runMainLoop(); - // Initialize clients to ResourceManager and NodeManagers - Configuration conf = new YarnConfiguration(); + } - AMRMClient rmClient = AMRMClient.createAMRMClient(); - rmClient.init(conf); - rmClient.start(); + public void runMainLoop() throws Exception { - NMClient nmClient = NMClient.createNMClient(); - nmClient.init(conf); - nmClient.start(); + AMRMClientAsync rmClient = AMRMClientAsync.createAMRMClientAsync(100, this); + rmClient.init(getConfiguration()); + rmClient.start(); // Register with ResourceManager + System.out.println("[AM] registerApplicationMaster 0"); rmClient.registerApplicationMaster("", 0, ""); + System.out.println("[AM] registerApplicationMaster 1"); // Priority for worker containers - priorities are intra-application Priority priority = Records.newRecord(Priority.class); @@ -93,41 +125,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { capability.setVirtualCores(1); // Make container requests to ResourceManager - for (int i = 0; i < n; ++i) { - AMRMClient.ContainerRequest containerAsk = - new AMRMClient.ContainerRequest(capability, null, null, priority); - + for (int i = 0; i < numContainersToWaitFor; ++i) { + AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority); + System.out.println("[AM] Making res-req " + i); rmClient.addContainerRequest(containerAsk); } - // Obtain allocated containers, launch and check for responses - int responseId = 0; - int completedContainers = 0; - while (completedContainers < n) { - AllocateResponse response = rmClient.allocate(responseId++); - for (Container container : response.getAllocatedContainers()) { - // Launch container by create ContainerLaunchContext - ContainerLaunchContext ctx = - Records.newRecord(ContainerLaunchContext.class); - - ctx.setCommands( - Collections.singletonList( - command + - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" - )); - - nmClient.startContainer(container, ctx); - } - for (ContainerStatus status : response.getCompletedContainersStatuses()) { - ++completedContainers; - System.out.println("Completed container " + status.getContainerId()); - } + System.out.println("[AM] waiting for containers to finish"); + while (!doneWithContainers()) { Thread.sleep(100); } + System.out.println("[AM] unregisterApplicationMaster 0"); // Un-register with ResourceManager rmClient.unregisterApplicationMaster( - FinalApplicationStatus.SUCCEEDED, "", ""); + FinalApplicationStatus.SUCCEEDED, "", ""); + System.out.println("[AM] unregisterApplicationMaster 1"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java index 7cef50d..e020ef4 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java @@ -17,11 +17,24 @@ package org.apache.ignite.yarn; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.*; import org.apache.hadoop.yarn.conf.*; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.logging.*; +import static org.apache.hadoop.yarn.api.ApplicationConstants.*; + /** * Ignite yarn client. */ @@ -35,16 +48,109 @@ public class IgniteYarnClient { * @param args Args. */ public static void main(String[] args) throws Exception { - ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null); - - // Create yarnClient YarnConfiguration conf = new YarnConfiguration(); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); yarnClient.start(); + // Create application via yarnClient YarnClientApplication app = yarnClient.createApplication(); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + amContainer.setCommands( + Collections.singletonList( + " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" + + " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr" + ) + ); + + // Setup jar for ApplicationMaster + final LocalResource appMasterJar = Records.newRecord(LocalResource.class); + setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf); + + final LocalResource igniteZip = Records.newRecord(LocalResource.class); + setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf); + + FileSystem fileSystem = FileSystem.get(conf); + + Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh")); + + System.out.println("Path: " + path); + System.out.println("Path URI: " + path.toUri().toString()); + + amContainer.setLocalResources(new HashMap(){{ + put("ignite-yarn.jar", appMasterJar); + put("ignite", igniteZip); + }}); + + // Setup CLASSPATH for ApplicationMaster + Map appMasterEnv = new HashMap(); + setupAppMasterEnv(appMasterEnv, conf); + amContainer.setEnvironment(appMasterEnv); + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(256); + capability.setVirtualCores(1); + + // Finally, set-up ApplicationSubmissionContext for the application + ApplicationSubmissionContext appContext = + app.getApplicationSubmissionContext(); + appContext.setApplicationName("simple-yarn-app"); // application name + appContext.setAMContainerSpec(amContainer); + appContext.setResource(capability); + appContext.setQueue("default"); // queue + + // Submit application + ApplicationId appId = appContext.getApplicationId(); + System.out.println("Submitting application " + appId); + yarnClient.submitApplication(appContext); + + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + YarnApplicationState appState = appReport.getYarnApplicationState(); + while (appState != YarnApplicationState.FINISHED && + appState != YarnApplicationState.KILLED && + appState != YarnApplicationState.FAILED) { + Thread.sleep(100); + appReport = yarnClient.getApplicationReport(appId); + appState = appReport.getYarnApplicationState(); + } + + System.out.println( + "Application " + appId + " finished with" + + " state " + appState + + " at " + appReport.getFinishTime()); + } + + private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf) + throws Exception { + FileSystem fileSystem = FileSystem.get(conf); + jarPath = fileSystem.makeQualified(jarPath); + + FileStatus jarStat = fileSystem.getFileStatus(jarPath); + + appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); + appMasterJar.setSize(jarStat.getLen()); + appMasterJar.setTimestamp(jarStat.getModificationTime()); + appMasterJar.setType(LocalResourceType.ARCHIVE); + appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION); + + System.out.println("Path :" + jarPath); + } + + private static void setupAppMasterEnv(Map appMasterEnv, YarnConfiguration conf) { + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) + Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), + c.trim(), File.pathSeparator); + + Apps.addToEnvironment(appMasterEnv, + Environment.CLASSPATH.name(), + Environment.PWD.$() + File.separator + "*", + File.pathSeparator); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java index 1a03743..04d3492 100644 --- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java +++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java @@ -23,5 +23,7 @@ import junit.framework.*; * Scheduler tests. */ public class IgniteSchedulerSelfTest extends TestCase { + public void testName() throws Exception { + } }