Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B06DC100C8 for ; Wed, 20 Nov 2013 21:12:47 +0000 (UTC) Received: (qmail 76266 invoked by uid 500); 20 Nov 2013 21:12:47 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 76232 invoked by uid 500); 20 Nov 2013 21:12:47 -0000 Mailing-List: contact commits-help@helix.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.incubator.apache.org Delivered-To: mailing list commits@helix.incubator.apache.org Received: (qmail 76183 invoked by uid 99); 20 Nov 2013 21:12:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:12:47 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 20 Nov 2013 21:12:45 +0000 Received: (qmail 74498 invoked by uid 99); 20 Nov 2013 21:12:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:12:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 275EF885B55; Wed, 20 Nov 2013 21:12:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kishoreg@apache.org To: commits@helix.incubator.apache.org Date: Wed, 20 Nov 2013 21:13:04 -0000 Message-Id: <5479457d359e40acb65cdf6b5865c87c@git.apache.org> In-Reply-To: <4b53611394d042768c47696b1202ee8c@git.apache.org> References: <4b53611394d042768c47696b1202ee8c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/52] [abbrv] git commit: In the middle of changes, need to pull in latest helix X-Virus-Checked: Checked by ClamAV on apache.org In the middle of changes, need to pull in latest helix Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/342d0e7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/342d0e7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/342d0e7f Branch: refs/heads/helix-yarn Commit: 342d0e7f62859c46b980b3309862767d0490f8e3 Parents: 0b2da84 Author: Kishore Gopalakrishna Authored: Tue Nov 19 11:26:16 2013 -0800 Committer: Kishore Gopalakrishna Committed: Tue Nov 19 11:26:16 2013 -0800 ---------------------------------------------------------------------- pom.xml | 5 + recipes/auto-scale/pom.xml | 2 +- .../impl/yarn/YarnContainerProviderProcess.java | 277 ++++++++++--------- .../autoscale/provider/ProviderRebalancer.java | 3 +- 4 files changed, 159 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6840410..b21bee4 100644 --- a/pom.xml +++ b/pom.xml @@ -169,6 +169,11 @@ under the License. SnakeYAML repository http://oss.sonatype.org/content/groups/public/ + + jboss-fs-public + JBoss FuseSource repository + http://repository.jboss.org/nexus/content/groups/fs-public/ + http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/pom.xml ---------------------------------------------------------------------- diff --git a/recipes/auto-scale/pom.xml b/recipes/auto-scale/pom.xml index 95331f4..0482d3d 100644 --- a/recipes/auto-scale/pom.xml +++ b/recipes/auto-scale/pom.xml @@ -13,7 +13,7 @@ Apache Helix :: Recipes :: Auto-Scale - 0.23.9 + 2.2.0 unit local, shell http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java ---------------------------------------------------------------------- diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java index 20a8b92..37f942d 100644 --- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java +++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java @@ -1,7 +1,6 @@ package org.apache.helix.autoscale.impl.yarn; import java.io.File; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -10,22 +9,17 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.Records; import org.apache.helix.autoscale.Service; import org.apache.log4j.Logger; @@ -38,121 +32,154 @@ import com.google.common.base.Preconditions; */ public class YarnContainerProviderProcess implements Service { - static final Logger log = Logger.getLogger(YarnContainerProviderProcess.class); - - static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr"; - - Configuration conf; - YarnRPC rpc; - ClientRMProtocol rmClient; - ApplicationId appId; - File propertiesFile; - - YarnContainerProviderProperties properties; - - @Override - public void configure(Properties properties) throws Exception { - configure(YarnUtils.createContainerProviderProperties(properties)); - } - - private void configure(YarnContainerProviderProperties properties) { - this.conf = new YarnConfiguration(); - this.conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager()); - this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler()); - this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs()); - - this.rpc = YarnRPC.create(conf); - - this.properties = properties; - } - - @Override - public void start() throws Exception { - Preconditions.checkNotNull(properties); - Preconditions.checkState(properties.isValid()); - - connect(); - - String command = String.format(YARN_MASTER_COMMAND, YarnUtils.YARN_MASTER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR, - ApplicationConstants.LOG_DIR_EXPANSION_VAR); - - log.info(String.format("Starting application '%s' provider '%s' (masterCommand='%s')", properties.getYarnData(), properties.getName(), command)); - - log.debug(String.format("Running master command \"%s\"", command)); - - // app id - GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class); - GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest); - - this.appId = appResponse.getApplicationId(); - - log.info(String.format("Acquired app id '%s' for '%s'", appId.toString(), properties.getName())); - - // command - ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class); - launchContext.setCommands(Collections.singletonList(command)); - - // resource limit - Resource resource = Records.newRecord(Resource.class); - resource.setMemory(256); // TODO make dynamic - launchContext.setResource(resource); - - // environment - Map env = new HashMap(); - launchContext.setEnvironment(env); - - // configuration - propertiesFile = YarnUtils.writePropertiesToTemp(properties); - - // HDFS - final String namespace = appId.toString(); - final Path masterArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_MASTER_ARCHIVE_PATH, YarnUtils.YARN_MASTER_STAGING, namespace, conf); - final Path masterProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf); - final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_ARCHIVE_PATH, YarnUtils.YARN_CONTAINER_STAGING, namespace, conf); - - // local resources - Map localResources = new HashMap(); - localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils.createHdfsResource(masterArchive, LocalResourceType.ARCHIVE, conf)); - localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils.createHdfsResource(masterProperties, LocalResourceType.FILE, conf)); - localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.createHdfsResource(containerArchive, LocalResourceType.FILE, conf)); - - launchContext.setLocalResources(localResources); - - // user - launchContext.setUser(properties.getUser()); - - // app submission - ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class); - subContext.setApplicationId(appId); - subContext.setApplicationName(properties.getName()); - subContext.setAMContainerSpec(launchContext); - - SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class); - subRequest.setApplicationSubmissionContext(subContext); - - log.info(String.format("Starting app id '%s'", appId.toString())); - - rmClient.submitApplication(subRequest); - - } - - @Override - public void stop() throws YarnRemoteException { - log.info(String.format("Stopping app id '%s'", appId.toString())); - KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class); - killRequest.setApplicationId(appId); - - rmClient.forceKillApplication(killRequest); - - try { YarnUtils.destroyHdfsNamespace(appId.toString(), conf); } catch(Exception ignore) {} - - propertiesFile.delete(); - } - - void connect() { - YarnConfiguration yarnConf = new YarnConfiguration(conf); - InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); - log.info("Connecting to ResourceManager at: " + rmAddress); - this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf)); - } + static final Logger log = Logger + .getLogger(YarnContainerProviderProcess.class); + + static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr"; + + Configuration conf; + YarnClient yarnClient; + ApplicationId appId; + File propertiesFile; + + YarnContainerProviderProperties properties; + + @Override + public void configure(Properties properties) throws Exception { + configure(YarnUtils.createContainerProviderProperties(properties)); + } + + private void configure(YarnContainerProviderProperties properties) { + this.conf = new YarnConfiguration(); + this.conf.set(YarnConfiguration.RM_ADDRESS, + properties.getResourceManager()); + this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + properties.getScheduler()); + this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs()); + this.yarnClient = YarnClient.createYarnClient(); + + this.properties = properties; + } + + @Override + public void start() throws Exception { + Preconditions.checkNotNull(properties); + Preconditions.checkState(properties.isValid()); + + connect(); + + String command = String.format(YARN_MASTER_COMMAND, + YarnUtils.YARN_MASTER_PATH, + ApplicationConstants.LOG_DIR_EXPANSION_VAR, + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + + log.info(String.format( + "Starting application '%s' provider '%s' (masterCommand='%s')", + properties.getYarnData(), properties.getName(), command)); + + log.debug(String.format("Setting app master command \"%s\"", command)); + + YarnClientApplication application; + application = yarnClient.createApplication(); + ApplicationSubmissionContext appContext = application + .getApplicationSubmissionContext(); + // LOG.info("Got new ApplicationId=" + // + application.getNewApplicationResponse().getApplicationId()); + // // app id + // GetNewApplicationRequest appRequest = + // Records.newRecord(GetNewApplicationRequest.class); + // GetNewApplicationResponse appResponse = + // rmClient.getNewApplication(appRequest); + + this.appId = application.getApplicationSubmissionContext() + .getApplicationId(); + + log.info(String.format("Acquired app id '%s' for '%s'", + appId.toString(), properties.getName())); + + // command + ContainerLaunchContext launchContext = Records + .newRecord(ContainerLaunchContext.class); + launchContext.setCommands(Collections.singletonList(command)); + + // environment + Map env = new HashMap(); + launchContext.setEnvironment(env); + + // configuration + propertiesFile = YarnUtils.writePropertiesToTemp(properties); + + // HDFS + final String namespace = appId.toString(); + final Path masterArchive = YarnUtils.copyToHdfs( + YarnUtils.YARN_MASTER_ARCHIVE_PATH, + YarnUtils.YARN_MASTER_STAGING, namespace, conf); + final Path masterProperties = YarnUtils.copyToHdfs( + propertiesFile.getCanonicalPath(), + YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf); + final Path containerArchive = YarnUtils.copyToHdfs( + YarnUtils.YARN_CONTAINER_ARCHIVE_PATH, + YarnUtils.YARN_CONTAINER_STAGING, namespace, conf); + + // local resources + Map localResources = new HashMap(); + localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils + .createHdfsResource(masterArchive, LocalResourceType.ARCHIVE, + conf)); + localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils + .createHdfsResource(masterProperties, LocalResourceType.FILE, + conf)); + localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils + .createHdfsResource(containerArchive, LocalResourceType.FILE, + conf)); + + launchContext.setLocalResources(localResources); + + // user + // appContext.setUser(properties.getUser()); + + // app submission + + // resource limit, cannot set resource constraint for starting App + // Master + Resource resource = Records.newRecord(Resource.class); + resource.setMemory(256); // TODO make dynamic + appContext.setResource(resource); + + appContext.setApplicationId(appId); + appContext.setApplicationName(properties.getName()); + appContext.setAMContainerSpec(launchContext); + + // SubmitApplicationRequest subRequest = + // Records.newRecord(SubmitApplicationRequest.class); + // subRequest.setApplicationSubmissionContext(subContext); + + log.info(String.format("Starting app id '%s'", appId.toString())); + + yarnClient.submitApplication(appContext); + + } + + @Override + public void stop() { + log.info(String.format("Stopping app id '%s'", appId.toString())); + KillApplicationRequest killRequest = Records + .newRecord(KillApplicationRequest.class); + killRequest.setApplicationId(appId); + try { + yarnClient.killApplication(appId); + YarnUtils.destroyHdfsNamespace(appId.toString(), conf); + } catch (Exception e) { + log.error("Exception while stoppilg app " + appId.toString(), e); + } + + propertiesFile.delete(); + } + + void connect() { + log.info("Connecting to ResourceManager at: " + + conf.get(YarnConfiguration.RM_ADDRESS)); + yarnClient.init(conf); + yarnClient.start(); + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java ---------------------------------------------------------------------- diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java index 2b6e428..44e0762 100644 --- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java +++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java @@ -61,10 +61,9 @@ public class ProviderRebalancer implements Rebalancer { } @Override - public ResourceAssignment computeResourceMapping(Resource resource, IdealState idealState, CurrentStateOutput currentStateOutput, + public ResourceAssignment computeResourceMapping(String resourceName, IdealState idealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { - final String resourceName = resource.getResourceName(); final String containerType = resourceName; final SortedSet allContainers = Sets.newTreeSet(new IndexedNameComparator());