incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1179201 - in /incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp: BSPApplicationMaster.java BSPJobImpl.java BSPPeerImpl.java BSPRunner.java BSPTaskLauncher.java
Date Wed, 05 Oct 2011 12:49:42 GMT
Author: tjungblut
Date: Wed Oct  5 12:49:41 2011
New Revision: 1179201

URL: http://svn.apache.org/viewvc?rev=1179201&view=rev
Log:
Refactored to local resource usage and added a bsp runner.

Added:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java
  (with props)
Modified:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1179201&r1=1179200&r2=1179201&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
(original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
Wed Oct  5 12:49:41 2011
@@ -72,6 +72,7 @@ public class BSPApplicationMaster {
   private long startTime;
 
   private BSPJob job;
+  private BSPJobID jobId;
 
   private SyncServerImpl syncServer;
   private Future<Long> syncServerFuture;
@@ -97,14 +98,15 @@ public class BSPApplicationMaster {
     clock = new SystemClock();
     startTime = clock.getTime();
 
+    jobId = new BSPJobID(appAttemptId.toString(), 0);
+
     // TODO this is not localhost, is it?
     hostname = InetAddress.getLocalHost().getCanonicalHostName();
     startSyncServer();
     clientPort = getFreePort();
 
     amrmRPC = getYarnRPCConnection(localConf);
-    registerApplicationMaster(amrmRPC,
-        appAttemptId, hostname, clientPort, null);
+    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
   }
 
   /**
@@ -189,7 +191,7 @@ public class BSPApplicationMaster {
   private void start() throws Exception {
     JobState finalState = null;
     try {
-      job = new BSPJobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile);
+      job = new BSPJobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
       finalState = job.startJob();
     } finally {
       if (finalState != null) {

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java?rev=1179201&r1=1179200&r2=1179201&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
(original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
Wed Oct  5 12:49:41 2011
@@ -54,6 +54,7 @@ public class BSPJobImpl implements BSPJo
   private static final int DEFAULT_MEMORY_MB = 256;
 
   private Configuration conf;
+  private BSPJobID jobId;
   private int numBSPTasks;
   private int priority = 0;
   private String childOpts;
@@ -76,7 +77,7 @@ public class BSPJobImpl implements BSPJo
 
   public BSPJobImpl(ApplicationAttemptId appAttemptId,
       Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
-      String jobFile) {
+      String jobFile, BSPJobID jobId) {
     super();
     this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
     this.appAttemptId = appAttemptId;
@@ -84,6 +85,7 @@ public class BSPJobImpl implements BSPJo
     this.resourceManager = amrmRPC;
     this.jobFile = new Path(jobFile);
     this.state = JobState.NEW;
+    this.jobId = jobId;
     this.conf = jobConfiguration;
     this.childOpts = conf.get("bsp.child.java.opts");
 
@@ -168,7 +170,7 @@ public class BSPJobImpl implements BSPJo
           + allocatedContainer.getState() + ", containerResourceMemory"
           + allocatedContainer.getResource().getMemory());
 
-      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
+      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(jobId, id,
           allocatedContainer, conf, yarnRPC, jobFile);
       launchers.put(id, runnableLaunchContainer);
       completionService.submit(runnableLaunchContainer);

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1179201&r1=1179200&r2=1179201&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Wed Oct  5 12:49:41 2011
@@ -48,7 +48,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.bsp.sync.SyncServer;
 import org.apache.hama.bsp.sync.SyncServerImpl;
 import org.apache.hama.checkpoint.CheckpointRunner;
-import org.apache.hama.ipc.BSPPeerProtocol;
 
 /**
  * This class represents a BSP peer.
@@ -71,9 +70,7 @@ public class BSPPeerImpl implements BSPP
   private TaskStatus currentTaskStatus;
 
   private TaskAttemptID taskid;
-  private BSPPeerProtocol umbilical;
   private SyncServer syncService;
-
   private final BSPMessageSerializer messageSerializer;
 
   public static final class BSPSerializableMessage implements Writable {
@@ -184,14 +181,12 @@ public class BSPPeerImpl implements BSPP
    * BSPPeer acts on behalf of clients performing bsp() tasks.
    * 
    * @param conf is the configuration file containing bsp peer host, port, etc.
-   * @param umbilical is the bsp protocol used to contact its parent process.
    * @param taskid is the id that current process holds.
    */
-  public BSPPeerImpl(Configuration conf, TaskAttemptID taskid,
-      BSPPeerProtocol umbilical) throws IOException {
+  public BSPPeerImpl(Configuration conf, TaskAttemptID taskid)
+      throws IOException {
     this.conf = conf;
     this.taskid = taskid;
-    this.umbilical = umbilical;
 
     String bindAddress = conf.get(Constants.PEER_HOST,
         Constants.DEFAULT_PEER_HOST);
@@ -292,7 +287,7 @@ public class BSPPeerImpl implements BSPP
         try {
           peer = getBSPPeerConnection(entry.getKey());
         } catch (NullPointerException ne) {
-          umbilical.fatalError(taskid, entry.getKey().getHostName()
+          LOG.error(taskid + ": " + entry.getKey().getHostName()
               + " doesn't exists.");
         }
       }
@@ -313,7 +308,6 @@ public class BSPPeerImpl implements BSPP
 
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
-    umbilical.statusUpdate(taskid, currentTaskStatus);
 
     // Clear outgoing queues.
     clearOutgoingQueues();
@@ -385,8 +379,8 @@ public class BSPPeerImpl implements BSPP
 
       if (peer == null) {
         try {
-          peer = (BSPPeer) RPC.getProxy(BSPPeer.class,
-              BSPPeer.versionID, addr, this.conf);
+          peer = (BSPPeer) RPC.getProxy(BSPPeer.class, BSPPeer.versionID, addr,
+              this.conf);
         } catch (IOException e) {
           LOG.error(e);
         }
@@ -396,18 +390,6 @@ public class BSPPeerImpl implements BSPP
 
     return peer;
   }
-  
-  /**
-   * TODO
-   * Main entry point after a container has launched.
-   * @param args
-   */
-  public static void main(String[] args) {
-    // id is the first of the args (int)
-    // host:port pair of the sync service is the second arg (string)
-    // third arg is the qualified path of the job configuration
-    
-  }
 
   /**
    * @return the string as host:port of this Peer
@@ -463,6 +445,13 @@ public class BSPPeerImpl implements BSPP
   }
 
   /**
+   * @return the sync service
+   */
+  public SyncServer getSyncService() {
+    return syncService;
+  }
+
+  /**
    * @return the size of outgoing queue
    */
   public int getOutgoingQueueSize() {

Added: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1179201&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java
(added)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java
Wed Oct  5 12:49:41 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+public class BSPRunner {
+
+  private static final Log LOG = LogFactory.getLog(BSPRunner.class);
+
+  private Configuration conf;
+  private TaskAttemptID id;
+  private BSPPeerImpl peer;
+
+  Class<? extends BSP> bspClass;
+
+  @SuppressWarnings("unchecked")
+  public BSPRunner(String jobId, int taskAttemptId, Path confPath)
+      throws IOException, ClassNotFoundException {
+    conf = new HamaConfiguration();
+    conf.addResource(confPath);
+    this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
+    this.id.id = taskAttemptId;
+    peer = new BSPPeerImpl(conf, id);
+    // this is a checked cast because we can only set a class via the BSPJob
+    // class which only allows derivates of BSP.
+    bspClass = (Class<? extends BSP>) conf.getClassByName(conf
+        .get("bsp.work.class"));
+  }
+
+  // TODO actually we should make the exception handling here and just exit the
+  // JVM accordingly.
+  public void startComputation() throws IOException, InterruptedException,
+      KeeperException {
+    BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
+    LOG.info("Syncing for the first time to wait for all the tasks to come up...");
+    peer.getSyncService().enterBarrier(id);
+    peer.getSyncService().leaveBarrier(id);
+    LOG.info("Initial sync was successful, now running the computation!");
+    bspInstance.bsp(peer);
+  }
+
+  /**
+   * TODO Main entry point after a container has launched.
+   * 
+   * @param args
+   * @throws IOException
+   * @throws NumberFormatException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  public static void main(String[] args) throws NumberFormatException,
+      IOException, ClassNotFoundException, KeeperException,
+      InterruptedException {
+    if (args.length != 3) {
+      throw new IllegalArgumentException("Expected 3 args given, but found: "
+          + Arrays.toString(args));
+    }
+    // jobid is the first of the args (string)
+    // taskid is the second arg (int)
+    // third arg is the qualified path of the job configuration
+    BSPRunner bspRunner = new BSPRunner(args[0], Integer.valueOf(args[1]),
+        new Path(args[2]));
+    bspRunner.startComputation();
+  }
+}

Propchange: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1179201&r1=1179200&r2=1179201&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
(original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
Wed Oct  5 12:49:41 2011
@@ -18,10 +18,9 @@
 package org.apache.hama.bsp;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
@@ -30,7 +29,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -38,8 +36,13 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
 
@@ -47,22 +50,21 @@ public class BSPTaskLauncher implements 
 
   private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
 
-  private static final String SYSTEM_PATH_SEPARATOR = System
-      .getProperty("path.separator");
-
   private final Container allocatedContainer;
   private final ContainerManager cm;
   private final Path jobFile;
   private final String user;
   private final Configuration conf;
   private final int id;
+  private final BSPJobID jobId;
 
-  public BSPTaskLauncher(int id, Container container, Configuration conf,
-      YarnRPC rpc, Path jobFile) throws YarnRemoteException {
+  public BSPTaskLauncher(BSPJobID jobId, int id, Container container,
+      Configuration conf, YarnRPC rpc, Path jobFile) throws YarnRemoteException {
     this.id = id;
     this.jobFile = jobFile;
     this.user = conf.get("user.name");
     this.conf = conf;
+    this.jobId = jobId;
     this.allocatedContainer = container;
     // Connect to ContainerManager on the allocated container
     String cmIpPortStr = container.getNodeId().getHost() + ":"
@@ -70,19 +72,11 @@ public class BSPTaskLauncher implements 
     InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
     cm = (ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress,
         conf);
-    LOG.info("Spawned task with id: " + this.id + " for allocated container id: "
+    LOG.info("Spawned task with id: " + this.id
+        + " for allocated container id: "
         + this.allocatedContainer.getId().toString());
   }
 
-  private File createWorkDirectory(Path jobFile) {
-    File workDir = new File(jobFile.getParent().toString(), "work");
-    boolean isCreated = workDir.mkdirs();
-    if (isCreated) {
-      LOG.info("TaskRunner.workDir : " + workDir);
-    }
-    return workDir;
-  }
-
   @Override
   protected void finalize() throws Throwable {
     stopAndCleanup();
@@ -95,66 +89,6 @@ public class BSPTaskLauncher implements 
     cm.stopContainer(stopRequest);
   }
 
-  private List<String> buildJvmArgs(Configuration jobConf, String classPath,
-      Class<?> child) throws IOException {
-    // Build exec child jmv args.
-    List<String> vargs = new ArrayList<String>();
-    File jvm = // use same jvm as parent
-    new File(new File(System.getProperty("java.home"), "bin"), "java");
-    vargs.add(jvm.toString());
-
-    // bsp.child.java.opts
-    String javaOpts = jobConf.get("bsp.child.mem.in.mb", "-Xmx256m");
-    javaOpts += " " + jobConf.get("bsp.child.java.opts", "");
-
-    String[] javaOptsSplit = javaOpts.split(" ");
-    for (int i = 0; i < javaOptsSplit.length; i++) {
-      vargs.add(javaOptsSplit[i]);
-    }
-
-    // Add classpath.
-    vargs.add("-classpath");
-    vargs.add(classPath.toString());
-    // Add main class and its arguments
-    LOG.debug("Executing child Process " + child.getName());
-    vargs.add(child.getName()); // main of bsppeer
-    vargs.add(id +"");
-    vargs.add(jobConf.get("bsp.sync.server.address"));
-    vargs.add(this.jobFile.makeQualified(FileSystem.get(conf)).toString());
-
-    return vargs;
-  }
-
-  // TODO for jars should use the containers methods
-  private String assembleClasspath(Configuration jobConf, File workDir) {
-    StringBuffer classPath = new StringBuffer();
-    // start with same classpath as parent process
-    classPath.append(System.getProperty("java.class.path"));
-    classPath.append(SYSTEM_PATH_SEPARATOR);
-
-    String jar = jobConf.get("bsp.jar");
-    if (jar != null) { // if jar exists, it into workDir
-      try {
-        RunJar.unJar(new File(jar), workDir);
-      } catch (IOException ioe) {
-        LOG.error("Unable to uncompressing file to " + workDir.toString(), ioe);
-      }
-      File[] libs = new File(workDir, "lib").listFiles();
-      if (libs != null) {
-        for (int i = 0; i < libs.length; i++) {
-          // add libs from jar to classpath
-          classPath.append(SYSTEM_PATH_SEPARATOR);
-          classPath.append(libs[i]);
-        }
-      }
-      classPath.append(SYSTEM_PATH_SEPARATOR);
-      classPath.append(new File(workDir, "classes"));
-      classPath.append(SYSTEM_PATH_SEPARATOR);
-      classPath.append(workDir);
-    }
-    return classPath.toString();
-  }
-
   @Override
   public BSPTaskStatus call() throws Exception {
     // Now we setup a ContainerLaunchContext
@@ -165,13 +99,22 @@ public class BSPTaskLauncher implements 
     ctx.setResource(allocatedContainer.getResource());
     ctx.setUser(user);
 
-    File workDir = createWorkDirectory(jobFile);
-    String classPath = assembleClasspath(conf, workDir);
-    LOG.info("Spawned child's classpath " + classPath);
-    List<String> bspArgs = buildJvmArgs(conf, classPath,
-        BSPPeerImpl.class);
+    LocalResource packageResource = Records.newRecord(LocalResource.class);
+    File packageFile = new File(conf.get("bsp.jar"));
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(new Path(conf
+        .get("bsp.jar")));
+
+    packageResource.setResource(packageUrl);
+    packageResource.setSize(packageFile.length());
+    packageResource.setTimestamp(packageFile.lastModified());
+    packageResource.setType(LocalResourceType.ARCHIVE);
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+    ctx.setCommands(Arrays.asList("java -cp './package/*' ",
+        BSPTaskLauncher.class.getCanonicalName(), jobId.getJtIdentifier(), id
+            + "", this.jobFile.makeQualified(FileSystem.get(conf)).toString()));
+    ctx.setLocalResources(Collections.singletonMap("package", packageResource));
 
-    ctx.setCommands(bspArgs);
     StartContainerRequest startReq = Records
         .newRecord(StartContainerRequest.class);
     startReq.setContainerLaunchContext(ctx);



Mime
View raw message