helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject git commit: Adding command line support to launch the cluster
Date Mon, 13 Jan 2014 05:01:14 GMT
Updated Branches:
  refs/heads/helix-provisioning c9ddde3e7 -> 9386a4cbc


Adding command line support to launch the cluster


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9386a4cb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9386a4cb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9386a4cb

Branch: refs/heads/helix-provisioning
Commit: 9386a4cbc3d8b5cd8e26ba3970fd77e259924351
Parents: c9ddde3
Author: Kishore Gopalakrishna <g.kishore@gmail.com>
Authored: Sun Jan 12 21:01:09 2014 -0800
Committer: Kishore Gopalakrishna <g.kishore@gmail.com>
Committed: Sun Jan 12 21:01:09 2014 -0800

----------------------------------------------------------------------
 helix-provisioning/pom.xml                      |  8 +++---
 .../apache/helix/provisioning/yarn/Client.java  | 22 ++++++---------
 .../yarn/HelixYarnApplicationMasterMain.java    | 29 ++++++++++++++++----
 .../provisioning/yarn/YarnProvisioner.java      |  9 +++---
 .../yarn/YarnProvisionerConfig.java             |  9 ++++++
 5 files changed, 51 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index 254d420..e04b3ef 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -48,13 +48,13 @@ under the License.
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
-      <scope>provided</scope>
+      <!-- <scope>provided</scope> -->
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-api</artifactId>
       <version>${hadoop.version}</version>
-      <scope>provided</scope>
+    <!--    <scope>provided</scope>  -->
     </dependency>
     <dependency>
       <groupId>org.testng</groupId>
@@ -82,8 +82,8 @@ under the License.
         <configuration>
           <programs>
             <program>
-              <mainClass>org.apache.helix.provisioning.HelixAgentMain</mainClass>
-              <name>start-helix-provisioning</name>
+              <mainClass>org.apache.helix.provisioning.yarn.Client</mainClass>
+              <name>yarn-job-launcher</name>
             </program>
           </programs>
         </configuration>

http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
index a7a119f..15b43a6 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -138,9 +138,7 @@ public class Client {
   // Main class to invoke application master
   private final String appMasterMainClass;
 
-  // Amt of memory to request for container in which shell script will be executed
-  private int containerMemory = 10;
-  // No. of containers in which the shell script needs to be executed
+  // No. of containers in which helix participants will be started
   private int numContainers = 1;
 
   // log4j.properties file
@@ -204,14 +202,11 @@ public class Client {
     opts.addOption("appName", true, "Application Name.");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
-    opts.addOption("timeout", true, "Application timeout in milliseconds");
     opts.addOption("master_memory", true,
         "Amount of memory in MB to be requested to run the application master");
-    opts.addOption("archive", true, "Jar file containing the application master");
-    opts.addOption("container_memory", true,
-        "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("archive", true, "Archive file containing the app code");
     opts.addOption("num_containers", true,
-        "No. of containers on which the shell command needs to be executed");
+        "No. of containers on which Helix Participants will be launched");
     opts.addOption("log_properties", true, "log4j.properties file");
     opts.addOption("debug", false, "Dump out debug information");
     opts.addOption("help", false, "Print usage");
@@ -271,6 +266,8 @@ public class Client {
 
     appMasterArchive = cliParser.getOptionValue("archive");
 
+    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "4"));
+    
     log4jPropFile = cliParser.getOptionValue("log_properties", "");
 
     return true;
@@ -456,7 +453,6 @@ public class Client {
     // Set class name
     vargs.add(appMasterMainClass);
     // Set params for Application Master
-    vargs.add("--container_memory " + String.valueOf(containerMemory));
     vargs.add("--num_containers " + String.valueOf(numContainers));
 
     if (debugFlag) {
@@ -548,9 +544,9 @@ public class Client {
 
     while (true) {
 
-      // Check app status every 1 second.
+      // Check app status every 10 second.
       try {
-        Thread.sleep(1000);
+        Thread.sleep(10000);
       } catch (InterruptedException e) {
         LOG.debug("Thread sleep in monitoring loop interrupted");
       }
@@ -583,11 +579,11 @@ public class Client {
         return false;
       }
 
-      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+      /*if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
         LOG.info("Reached client specified timeout for application. Killing application");
         forceKillApplication(appId);
         return false;
-      }
+      }*/
     }
 
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 8be4754..2356e91 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -1,11 +1,15 @@
 package org.apache.helix.provisioning.yarn;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -24,6 +28,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.manager.zk.ZkHelixConnection;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
 
 /**
  * This will <br/>
@@ -35,7 +40,21 @@ import org.apache.helix.tools.StateModelConfigGenerator;
  * </ul>
  */
 public class HelixYarnApplicationMasterMain {
+  public static Logger LOG = Logger.getLogger(HelixYarnApplicationMasterMain.class);
+
   public static void main(String[] args) throws Exception {
+    int numContainers = 1;
+
+    Options opts;
+    opts = new Options();
+    opts.addOption("num_containers", true, "Number of containers");
+    try {
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+      numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers"));
+    } catch (Exception e) {
+      LOG.error("Error parsing input arguments" + Arrays.toString(args), e);
+    }
+
     // START ZOOKEEPER
     String dataDir = "dataDir";
     String logDir = "logDir";
@@ -43,18 +62,17 @@ public class HelixYarnApplicationMasterMain {
 
       @Override
       public void createDefaultNameSpace(ZkClient zkClient) {
-        
+
       }
     };
     FileUtils.deleteDirectory(new File(dataDir));
     FileUtils.deleteDirectory(new File(logDir));
-    
+
     final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
     server.start();
 
     // start
     Map<String, String> envs = System.getenv();
-
     ContainerId containerId =
         ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
     ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
@@ -86,7 +104,8 @@ public class HelixYarnApplicationMasterMain {
 
     // add the resource with the local provisioner
     ResourceId resourceId = ResourceId.from(resourceName);
-    ProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+    YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+    provisionerConfig.setNumContainers(numContainers);
     RebalancerConfig rebalancerConfig =
         new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS)
             .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build();
@@ -96,7 +115,7 @@ public class HelixYarnApplicationMasterMain {
     // start controller
     ControllerId controllerId = ControllerId.from("controller1");
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync(); 
+    controller.startAsync();
 
     Thread shutdownhook = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 1a903d4..1d0c078 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -49,6 +49,7 @@ import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerSpec;
 import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.controller.provisioner.Provisioner;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.provisioner.TargetProviderResponse;
 
 import com.google.common.collect.Lists;
@@ -65,7 +66,6 @@ public class YarnProvisioner implements Provisioner {
   static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
       .newCachedThreadPool());
   Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId,
Container>();
-  int DEFAULT_CONTAINER = 1;
   private HelixManager _helixManager;
 
   @Override
@@ -258,10 +258,11 @@ public class YarnProvisioner implements Provisioner {
     List<Participant> containersToStart = Lists.newArrayList();
     List<Participant> containersToRelease = Lists.newArrayList();
     List<Participant> containersToStop = Lists.newArrayList();
-
-    for (int i = 0; i < DEFAULT_CONTAINER - participants.size(); i++) {
+    YarnProvisionerConfig  provisionerConfig = (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId).getProvisionerConfig();
+    int targetNumContainers = provisionerConfig.getNumContainers();
+    for (int i = 0; i < targetNumContainers - participants.size(); i++) {
       containersToAcquire.add(new ContainerSpec(ContainerId.from("container"
-          + (DEFAULT_CONTAINER - i))));
+          + (targetNumContainers - i))));
     }
     response.setContainersToAcquire(containersToAcquire);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 0c1dbda..67dd679 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@ -12,6 +12,7 @@ public class YarnProvisionerConfig implements ProvisionerConfig {
   private ResourceId _resourceId;
   private Class<? extends StringSerializer> _serializerClass;
   private ProvisionerRef _provisionerRef;
+  private Integer _numContainers;
 
   public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) {
     _resourceId = resourceId;
@@ -19,6 +20,14 @@ public class YarnProvisionerConfig implements ProvisionerConfig {
     _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName());
   }
 
+  public void setNumContainers(int numContainers) {
+    _numContainers = numContainers;
+  }
+
+  public Integer getNumContainers() {
+    return _numContainers;
+  }
+
   @Override
   public ResourceId getResourceId() {
     return _resourceId;


Mime
View raw message