incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1214138 - in /incubator/hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ yarn/ yarn/src/main/java/org/apache/hama/bsp/
Date Wed, 14 Dec 2011 10:13:16 GMT
Author: tjungblut
Date: Wed Dec 14 10:13:15 2011
New Revision: 1214138

URL: http://svn.apache.org/viewvc?rev=1214138&view=rev
Log:
[HAMA-467] Add I/O System to YARN

Added:
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java   (with props)
Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
    incubator/hama/trunk/yarn/pom.xml
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Dec 14 10:13:15 2011
@@ -199,7 +199,7 @@ public class BSPJob extends BSPJobContex
 
   public void submit() throws IOException, InterruptedException {
     ensureState(JobState.DEFINE);
-    info = jobClient.submitJobInternal(this);
+    info = jobClient.submitJob(this);
     state = JobState.RUNNING;
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Wed Dec 14 10:13:15 2011
@@ -27,8 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-
-import javax.security.auth.login.LoginException;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,8 +48,8 @@ import org.apache.hadoop.io.compress.Com
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hama.HamaConfiguration;
@@ -87,6 +86,9 @@ public class BSPJobClient extends Config
     JobProfile profile;
     JobStatus status;
     long statustime;
+    
+    public NetworkedJob() {
+    }
 
     public NetworkedJob(JobStatus job) throws IOException {
       this.status = job;
@@ -216,8 +218,8 @@ public class BSPJobClient extends Config
     if (masterAdress != null && !masterAdress.equals("local")) {
       this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
           JobSubmissionProtocol.class, JobSubmissionProtocol.versionID,
-          BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
-              JobSubmissionProtocol.class));
+          BSPMaster.getAddress(conf), conf,
+          NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
     } else {
       LOG.debug("Using local BSP runner.");
       this.jobSubmitClient = new LocalBSPRunner(conf);
@@ -265,18 +267,6 @@ public class BSPJobClient extends Config
     return jobSubmitClient.jobsToComplete();
   }
 
-  private UnixUserGroupInformation getUGI(Configuration conf)
-      throws IOException {
-    UnixUserGroupInformation ugi = null;
-    try {
-      ugi = UnixUserGroupInformation.login(conf, true);
-    } catch (LoginException e) {
-      throw (IOException) (new IOException(
-          "Failed to get the current user's information.").initCause(e));
-    }
-    return ugi;
-  }
-
   /**
    * Submit a job to the BSP system. This returns a handle to the
    * {@link RunningJob} which can be used to track the running-job.
@@ -289,14 +279,13 @@ public class BSPJobClient extends Config
    */
   public RunningJob submitJob(BSPJob job) throws FileNotFoundException,
       IOException {
-    return submitJobInternal(job);
+    return submitJobInternal(job, jobSubmitClient.getNewJobId());
   }
 
   static Random r = new Random();
 
-  public RunningJob submitJobInternal(BSPJob job) throws IOException {
-    BSPJobID jobId = jobSubmitClient.getNewJobId();
-
+  public RunningJob submitJobInternal(BSPJob job, BSPJobID jobId)
+      throws IOException {
     job.setJobID(jobId);
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
@@ -306,12 +295,6 @@ public class BSPJobClient extends Config
     Path submitJobFile = new Path(submitJobDir, "job.xml");
     LOG.debug("BSPJobClient.submitJobDir: " + submitJobDir);
 
-    /*
-     * set this user's id in job configuration, so later job files can be
-     * accessed using this user's id
-     */
-    UnixUserGroupInformation ugi = getUGI(job.getConf());
-
     FileSystem fs = getFs();
     // Create a number of filenames in the BSPMaster's fs namespace
     fs.delete(submitJobDir, true);
@@ -351,10 +334,8 @@ public class BSPJobClient extends Config
     }
 
     // Set the user's name and working directory
-    job.setUser(ugi.getUserName());
-    if (ugi.getGroupNames().length > 0) {
-      job.set("group.name", ugi.getGroupNames()[0]);
-    }
+    job.setUser(getUnixUserName());
+    job.set("group.name", getUnixUserGroupName(job.getUser()));
     if (job.getWorkingDirectory() == null) {
       job.setWorkingDirectory(fs.getWorkingDirectory());
     }
@@ -369,6 +350,11 @@ public class BSPJobClient extends Config
       out.close();
     }
 
+    return launchJob(jobId, job, submitJobFile, fs);
+  }
+
+  protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
+      Path submitJobFile, FileSystem fs) throws IOException {
     //
     // Now, actually submit the job (using the submit name)
     //
@@ -381,8 +367,8 @@ public class BSPJobClient extends Config
     }
   }
 
-  @SuppressWarnings( { "rawtypes", "unchecked" })
-  private BSPJob partition(BSPJob job) throws IOException {
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  protected BSPJob partition(BSPJob job) throws IOException {
     InputSplit[] splits = job.getInputFormat().getSplits(job, 0);
     int numOfTasks = splits.length; // job.getNumBspTask();
     String input = job.getConf().get("bsp.input.dir");
@@ -417,8 +403,8 @@ public class BSPJobClient extends Config
           job, null);
       CompressionCodec codec = null;
       if (outputCompressorClass != null) {
-        codec = ReflectionUtils.newInstance(outputCompressorClass, job
-            .getConf());
+        codec = ReflectionUtils.newInstance(outputCompressorClass,
+            job.getConf());
       }
 
       try {
@@ -631,7 +617,7 @@ public class BSPJobClient extends Config
       job.setNumBspTask(jc.getClusterStatus(false).getMaxTasks());
     }
 
-    RunningJob running = jc.submitJobInternal(job);
+    RunningJob running = jc.submitJob(job);
     BSPJobID jobId = running.getID();
     LOG.info("Running job: " + jobId.toString());
 
@@ -798,8 +784,9 @@ public class BSPJobClient extends Config
         System.out.println("Job name: " + job.getJobName());
         System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
             + "\tFailed : 3\tPrep : 4\n");
-        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus
-            .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername());
+        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
+            jobStatus.getRunState(), jobStatus.getStartTime(),
+            jobStatus.getUsername());
 
         exitCode = 0;
       }
@@ -888,6 +875,54 @@ public class BSPJobClient extends Config
     }
   }
 
+  /*
+   * Helper methods for unix operations
+   */
+
+  static String getUnixUserName() throws IOException {
+    String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
+    if (result.length != 1) {
+      throw new IOException("Expect one token as the result of "
+          + Shell.USER_NAME_COMMAND + ": " + toString(result));
+    }
+    return result[0];
+  }
+
+  static String getUnixUserGroupName(String user) throws IOException {
+    String[] result = executeShellCommand(new String[] { "bash", "-c",
+        "id -Gn " + user });
+    if (result.length < 1) {
+      throw new IOException("Expect one token as the result of "
+          + "bash -c id -Gn " + user + ": " + toString(result));
+    }
+    return result[0];
+  }
+
+  protected static String toString(String[] strArray) {
+    if (strArray == null || strArray.length == 0) {
+      return "";
+    }
+    StringBuilder buf = new StringBuilder(strArray[0]);
+    for (int i = 1; i < strArray.length; i++) {
+      buf.append(' ');
+      buf.append(strArray[i]);
+    }
+    return buf.toString();
+  }
+
+  protected static String[] executeShellCommand(String[] command)
+      throws IOException {
+    String groups = Shell.execCommand(command);
+    StringTokenizer tokenizer = new StringTokenizer(groups);
+    int numOfTokens = tokenizer.countTokens();
+    String[] tokens = new String[numOfTokens];
+    for (int i = 0; tokenizer.hasMoreTokens(); i++) {
+      tokens[i] = tokenizer.nextToken();
+    }
+
+    return tokens;
+  }
+
   static class RawSplit implements Writable {
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncServerImpl.java Wed Dec 14 10:13:15 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.sync;
 
+import javax.management.InstanceNotFoundException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -76,7 +78,7 @@ public class ZooKeeperSyncServerImpl imp
       ZookeeperTuple tuple = QuorumPeer.runShutdownableZooKeeper(conf);
       zooKeeper = tuple.main;
       zooKeeper.runFromConfig(tuple.conf);
-    } catch (Exception e) {
+    } catch (InstanceNotFoundException e) {
       LOG.debug(e);
     }
   }

Modified: incubator/hama/trunk/yarn/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/pom.xml?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/pom.xml (original)
+++ incubator/hama/trunk/yarn/pom.xml Wed Dec 14 10:13:15 2011
@@ -78,9 +78,19 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>0.23.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
       <version>0.23.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <version>0.23.0-SNAPSHOT</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.zookeeper</groupId>

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Wed Dec 14 10:13:15 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
@@ -92,6 +94,9 @@ public class BSPApplicationMaster implem
 
   private Counters globalCounter = new Counters();
 
+  private FileSystem fs;
+  private BSPJobClient.RawSplit[] splits;
+
   private BSPApplicationMaster(String[] args) throws Exception {
     if (args.length != 1) {
       throw new IllegalArgumentException();
@@ -128,8 +133,20 @@ public class BSPApplicationMaster implem
      */
     rewriteSubmitConfiguration(jobFile, jobConf);
 
+    String jobSplit = jobConf.get("bsp.job.split.file");
+    splits = null;
+    if (jobSplit != null) {
+      DataInputStream splitFile = fs.open(new Path(jobSplit));
+      try {
+        splits = BSPJobClient.readSplitFile(splitFile);
+      } finally {
+        splitFile.close();
+      }
+    }
+
     this.amrmRPC = getYarnRPCConnection(localConf);
-    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
+    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort,
+        "http://localhost:8080");
   }
 
   /**
@@ -200,7 +217,7 @@ public class BSPApplicationMaster implem
     appMasterRequest.setHost(appMasterHostName);
     appMasterRequest.setRpcPort(appMasterRpcPort);
     // TODO tracking URL
-    // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
     RegisterApplicationMasterResponse response = resourceManager
         .registerApplicationMaster(appMasterRequest);
     LOG.debug("ApplicationMaster has maximum resource capability of: "
@@ -301,7 +318,6 @@ public class BSPApplicationMaster implem
   /**
    * Writes the current configuration to a given path to reflect changes. For
    * example the sync server address is put after the file has been written.
-   * TODO this should upload to HDFS to a given path as well.
    * 
    * @throws IOException
    */
@@ -357,7 +373,17 @@ public class BSPApplicationMaster implem
 
   @Override
   public Task getTask(TaskAttemptID taskid) throws IOException {
-    return null;
+    BSPJobClient.RawSplit assignedSplit = null;
+    String splitName = NullInputFormat.NullInputSplit.class.getCanonicalName();
+    if (splits != null) {
+      assignedSplit = splits[taskid.id];
+      splitName = assignedSplit.getClassName();
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          assignedSplit.getBytes());
+    } else {
+      return new BSPTask(jobId, jobFile, taskid, taskid.id, splitName,
+          new BytesWritable());
+    }
   }
 
   @Override

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Wed Dec 14 10:13:15 2011
@@ -72,8 +72,11 @@ public class BSPRunner {
 
     BSPJob job = new BSPJob(new HamaConfiguration(conf));
 
-    peer = new BSPPeerImpl(job, conf, id, umbilical, port, umbilicalAddress,
-        null, counters);
+    BSPTask task = (BSPTask) umbilical.getTask(id);
+
+    peer = new BSPPeerImpl(job, conf, id, umbilical, id.id, task.splitClass,
+        task.split, counters);
+
     // this is a checked cast because we can only set a class via the BSPJob
     // class which only allows derivates of BSP.
     bspClass = (Class<? extends BSP>) conf.getClassByName(conf

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Wed Dec 14 10:13:15 2011
@@ -18,9 +18,9 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,9 +44,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
 
-public class BSPTaskLauncher implements Callable<BSPTaskStatus> {
+public class BSPTaskLauncher {
 
   private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
 
@@ -54,10 +53,12 @@ public class BSPTaskLauncher implements 
   private final int id;
   private final ContainerManager cm;
   private final Configuration conf;
-  private final String user;
+  private String user;
   private final Path jobFile;
   private final BSPJobID jobId;
 
+  private GetContainerStatusRequest statusRequest;
+
   public BSPTaskLauncher(int id, Container container, ContainerManager cm,
       Configuration conf, Path jobFile, BSPJobID jobId)
       throws YarnRemoteException {
@@ -67,7 +68,11 @@ public class BSPTaskLauncher implements 
     this.allocatedContainer = container;
     this.jobFile = jobFile;
     this.jobId = jobId;
+    // FIXME why does this contain mapreduce here?
     this.user = conf.get("bsp.user.name");
+    if (this.user == null) {
+      this.user = conf.get("mapreduce.job.user.name");
+    }
   }
 
   @Override
@@ -82,20 +87,30 @@ public class BSPTaskLauncher implements 
     cm.stopContainer(stopRequest);
   }
 
-  @Override
-  public BSPTaskStatus call() throws Exception {
+  public void start() throws IOException {
     LOG.info("Spawned task with id: " + this.id
         + " for allocated container id: "
         + this.allocatedContainer.getId().toString());
-    final GetContainerStatusRequest statusRequest = setupContainer(
-        allocatedContainer, cm, user, id);
+    statusRequest = setupContainer(allocatedContainer, cm, user, id);
+  }
+
+  /**
+   * This polls the current container status from container manager. Null if the
+   * container hasn't finished yet.
+   * 
+   * @return
+   * @throws Exception
+   */
+  public BSPTaskStatus poll() throws Exception {
 
     ContainerStatus lastStatus;
-    while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
+    if ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
         .getState() != ContainerState.COMPLETE) {
-      Thread.sleep(1000l);
+      return null;
     }
-
+    LOG.info(this.id + "\tLast report comes with existatus of "
+        + lastStatus.getExitStatus() + " and diagnose string of "
+        + lastStatus.getDiagnostics());
     return new BSPTaskStatus(id, lastStatus.getExitStatus());
   }
 
@@ -103,7 +118,7 @@ public class BSPTaskLauncher implements 
       Container allocatedContainer, ContainerManager cm, String user, int id)
       throws IOException {
     LOG.info("Setting up a container for user " + user + " with id of " + id
-        + " and containerID of " + allocatedContainer.getId());
+        + " and containerID of " + allocatedContainer.getId() + " as " + user);
     // Now we setup a ContainerLaunchContext
     ContainerLaunchContext ctx = Records
         .newRecord(ContainerLaunchContext.class);
@@ -118,7 +133,17 @@ public class BSPTaskLauncher implements 
     LocalResource packageResource = Records.newRecord(LocalResource.class);
     FileSystem fs = FileSystem.get(conf);
     Path packageFile = new Path(conf.get("bsp.jar"));
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile);
+    // FIXME there seems to be a problem with the converter utils and URL
+    // transformation
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+    LOG.info("PackageURL has been composed to " + packageUrl.toString());
+    try {
+      LOG.info("Reverting packageURL to path: "
+          + ConverterUtils.getPathFromYarnURL(packageUrl));
+    } catch (URISyntaxException e) {
+      LOG.fatal("If you see this error the workarround does not work", e);
+    }
 
     FileStatus fileStatus = fs.getFileStatus(packageFile);
     packageResource.setResource(packageUrl);
@@ -129,7 +154,7 @@ public class BSPTaskLauncher implements 
     LOG.info("Package resource: " + packageResource.getResource());
 
     ctx.setLocalResources(Collections.singletonMap("package", packageResource));
-    
+
     /*
      * TODO Package classpath seems not to work if you're in pseudo distributed
      * mode, because the resource must not be moved, it will never be unpacked.
@@ -165,6 +190,28 @@ public class BSPTaskLauncher implements 
     return statusReq;
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + id;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    BSPTaskLauncher other = (BSPTaskLauncher) obj;
+    if (id != other.id)
+      return false;
+    return true;
+  }
+
   public static class BSPTaskStatus {
     private final int id;
     private final int exitStatus;

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java Wed Dec 14 10:13:15 2011
@@ -19,7 +19,6 @@ package org.apache.hama.bsp;
 
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 
-
 /**
  * Main interface to interact with the job. Provides only getters.
  */
@@ -34,7 +33,7 @@ public interface Job {
   }
 
   public JobState startJob() throws Exception;
-  
+
   public void cleanup() throws YarnRemoteException;
 
   JobState getState();

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Wed Dec 14 10:13:15 2011
@@ -20,13 +20,11 @@ package org.apache.hama.bsp;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,9 +51,6 @@ import org.apache.hama.bsp.BSPTaskLaunch
 public class JobImpl implements Job {
 
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
-  private static final ExecutorService threadPool = Executors
-      .newCachedThreadPool();
-
   private static final int DEFAULT_MEMORY_MB = 256;
 
   private Configuration conf;
@@ -76,9 +71,9 @@ public class JobImpl implements Job {
   private List<Container> allocatedContainers;
   private List<ContainerId> releasedContainers = Collections.emptyList();
 
-  private ExecutorCompletionService<BSPTaskStatus> completionService = new ExecutorCompletionService<BSPTaskStatus>(
-      threadPool);
   private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
+  private Deque<BSPTaskLauncher> completionQueue = new LinkedList<BSPTaskLauncher>();
+
   private int lastResponseID = 0;
 
   public JobImpl(ApplicationAttemptId appAttemptId,
@@ -135,8 +130,7 @@ public class JobImpl implements Job {
   }
 
   @Override
-  public JobState startJob() throws YarnRemoteException, InterruptedException,
-      ExecutionException {
+  public JobState startJob() throws Exception {
 
     this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
     while (allocatedContainers.size() < numBSPTasks) {
@@ -157,7 +151,7 @@ public class JobImpl implements Job {
           + amResponse.getAvailableResources().getMemory() + "mb");
       this.lastResponseID = amResponse.getResponseId();
 
-//      availableResources = amResponse.getAvailableResources();
+      // availableResources = amResponse.getAvailableResources();
       this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
       LOG.info("Waiting to allocate "
           + (numBSPTasks - allocatedContainers.size()) + " more containers...");
@@ -187,23 +181,36 @@ public class JobImpl implements Job {
 
       BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
           allocatedContainer, cm, conf, jobFile, jobId);
+
       launchers.put(id, runnableLaunchContainer);
-      completionService.submit(runnableLaunchContainer);
+      runnableLaunchContainer.start();
+      completionQueue.add(runnableLaunchContainer);
       id++;
       launchedBSPTasks++;
     }
+    LOG.info("Waiting for tasks to finish...");
     state = JobState.RUNNING;
-
-    for (int i = 0; i < launchedBSPTasks; i++) {
-      BSPTaskStatus returnedTask = completionService.take().get();
-      if (returnedTask.getExitStatus() != 0) {
-        LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
-        state = JobState.FAILED;
-        return state;
-      } else {
-        LOG.info("Task \"" + returnedTask.getId() + "\" sucessfully finished!");
+    int completed = 0;
+    while (completed != numBSPTasks) {
+      for (BSPTaskLauncher task : completionQueue) {
+        BSPTaskStatus returnedTask = task.poll();
+        // if our task returned with a finished state
+        if (returnedTask != null) {
+          if (returnedTask.getExitStatus() != 0) {
+            LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+            state = JobState.FAILED;
+            return state;
+          } else {
+            LOG.info("Task \"" + returnedTask.getId()
+                + "\" sucessfully finished!");
+            completed++;
+            LOG.info("Waiting for " + (numBSPTasks - completed)
+                + " tasks to finish!");
+          }
+          cleanupTask(returnedTask.getId());
+        }
       }
-      cleanupTask(returnedTask.getId());
+      Thread.sleep(1000L);
     }
 
     state = JobState.SUCCESS;
@@ -221,14 +228,14 @@ public class JobImpl implements Job {
     BSPTaskLauncher bspTaskLauncher = launchers.get(id);
     bspTaskLauncher.stopAndCleanup();
     launchers.remove(id);
+    completionQueue.remove(bspTaskLauncher);
   }
 
   @Override
   public void cleanup() throws YarnRemoteException {
-    for (BSPTaskLauncher launcher : launchers.values()) {
+    for (BSPTaskLauncher launcher : completionQueue) {
       launcher.stopAndCleanup();
     }
-    threadPool.shutdownNow();
   }
 
   private List<ResourceRequest> createBSPTaskRequest(int numTasks,

Modified: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1214138&r1=1214137&r2=1214138&view=diff
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Wed Dec 14 10:13:15 2011
@@ -19,42 +19,20 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.HamaConfiguration;
 
@@ -62,21 +40,26 @@ public class YARNBSPJob extends BSPJob {
 
   private static final Log LOG = LogFactory.getLog(YARNBSPJob.class);
 
-  private BSPClient client;
-  private YarnRPC rpc;
-  private ApplicationId id;
-  private FileSystem fs;
+  private static volatile int id = 0;
 
+  private YARNBSPJobClient submitClient;
+  private BSPClient client;
   private boolean submitted;
-
   private ApplicationReport report;
-
   private ClientRMProtocol applicationsManager;
+  private YarnRPC rpc;
 
   public YARNBSPJob(HamaConfiguration conf) throws IOException {
     super(conf);
-    rpc = YarnRPC.create(conf);
-    fs = FileSystem.get(conf);
+    submitClient = new YARNBSPJobClient(conf);
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    this.rpc = YarnRPC.create(conf);
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+
+    this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(
+        ClientRMProtocol.class, rmAddress, conf));
   }
 
   public void setMemoryUsedPerTaskInMb(int mem) {
@@ -84,150 +67,22 @@ public class YARNBSPJob extends BSPJob {
   }
 
   public void kill() throws YarnRemoteException {
-    KillApplicationRequest killRequest = Records
-        .newRecord(KillApplicationRequest.class);
-    killRequest.setApplicationId(id);
-    applicationsManager.forceKillApplication(killRequest);
+    if (submitClient != null) {
+      KillApplicationRequest killRequest = Records
+          .newRecord(KillApplicationRequest.class);
+      killRequest.setApplicationId(submitClient.getId());
+      applicationsManager.forceKillApplication(killRequest);
+    }
   }
 
   @Override
   public void submit() throws IOException, InterruptedException {
-    LOG.info("Submitting job...");
-    if (conf.get("bsp.child.mem.in.mb") == null) {
-      LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values.");
-    }
-
-    if (rpc == null) {
-      rpc = YarnRPC.create(getConf());
-    }
-
-    if (fs == null) {
-      fs = FileSystem.get(getConf());
-    }
-
-    if (conf.get("bsp.user.name") == null) {
-      String s = getUnixUserName();
-      conf.set("bsp.user.name", s);
-      LOG.info("Retrieved username: " + s);
+    RunningJob submitJobInternal = submitClient.submitJobInternal(this,
+        new BSPJobID("hama_yarn", id++));
+    if (submitJobInternal != null) {
+      submitted = true;
+      report = submitClient.getReport();
     }
-
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
-        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    Configuration appsManagerServerConf = new Configuration(conf);
-    // TODO what is that?
-    // appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
-    // ClientRMSecurityInfo.class, SecurityInfo.class);
-
-    applicationsManager = ((ClientRMProtocol) rpc.getProxy(
-        ClientRMProtocol.class, rmAddress, appsManagerServerConf));
-
-    GetNewApplicationRequest request = Records
-        .newRecord(GetNewApplicationRequest.class);
-    GetNewApplicationResponse response = applicationsManager
-        .getNewApplication(request);
-    id = response.getApplicationId();
-    LOG.info("Got new ApplicationId=" + id);
-
-    // Create a new ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-    // set the ApplicationId
-    appContext.setApplicationId(this.id);
-    // set the application name
-    appContext.setApplicationName(this.getJobName());
-
-    // Create a new container launch context for the AM's container
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
-
-    // Define the local resources required
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    // Lets assume the jar we need for our ApplicationMaster is available in
-    // HDFS at a certain known path to us and we want to make it available to
-    // the ApplicationMaster in the launched container
-    Path jarPath = new Path(getWorkingDirectory(), id + "/app.jar");
-    fs.copyFromLocalFile(this.getLocalPath(this.getJar()), jarPath);
-    LOG.info("Copying app jar to " + jarPath);
-    conf.set("bsp.jar",
-        jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
-    FileStatus jarStatus = fs.getFileStatus(jarPath);
-    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-    amJarRsrc.setType(LocalResourceType.FILE);
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
-    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
-    amJarRsrc.setSize(jarStatus.getLen());
-    // this creates a symlink in the working directory
-    localResources.put("AppMaster.jar", amJarRsrc);
-    // Set the local resources into the launch context
-    amContainer.setLocalResources(localResources);
-
-    // Set up the environment needed for the launch context
-    Map<String, String> env = new HashMap<String, String>();
-    // Assuming our classes or jars are available as local resources in the
-    // working directory from which the command will be run, we need to append
-    // "." to the path.
-    // By default, all the hadoop specific classpaths will already be available
-    // in $CLASSPATH, so we should be careful not to overwrite it.
-    String classPathEnv = "$CLASSPATH:./*:";
-    env.put("CLASSPATH", classPathEnv);
-    amContainer.setEnvironment(env);
-
-    // saving the conf file at this point to hdfs
-    // it should be in HDFS.
-    Path xmlPath = new Path(getWorkingDirectory(), id + "/job.xml");
-    FSDataOutputStream out = fs.create(xmlPath);
-    this.writeXml(out);
-    out.flush();
-    out.close();
-
-    // Construct the command to be executed on the launched container
-    String command = "${JAVA_HOME}"
-        + "/bin/java -cp "
-        + classPathEnv
-        + " "
-        + BSPApplicationMaster.class.getCanonicalName()
-        + " "
-        + xmlPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
-            .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
-        + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
-        + "/stderr";
-
-    LOG.info("Start command: " + command);
-
-    amContainer.setCommands(Collections.singletonList(command));
-
-    Resource capability = Records.newRecord(Resource.class);
-    // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
-    // a base usage of 100mb
-    capability.setMemory(3 * this.getNumBspTask() + conf.getInt("hama.appmaster.memory.mb", 100));
-    LOG.info("Set memory for the application master to "
-        + capability.getMemory() + "mb!");
-    amContainer.setResource(capability);
-
-    // Set the container launch content into the ApplicationSubmissionContext
-    appContext.setAMContainerSpec(amContainer);
-
-    // Create the request to send to the ApplicationsManager
-    SubmitApplicationRequest appRequest = Records
-        .newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-    applicationsManager.submitApplication(appRequest);
-
-    GetApplicationReportRequest reportRequest = Records
-        .newRecord(GetApplicationReportRequest.class);
-    reportRequest.setApplicationId(id);
-    while (report == null || report.getHost().equals("N/A")) {
-      GetApplicationReportResponse reportResponse = applicationsManager
-          .getApplicationReport(reportRequest);
-      report = reportResponse.getApplicationReport();
-      Thread.sleep(1000L);
-    }
-    LOG.info("Got report: " + report.getApplicationId() + " "
-        + report.getHost());
-    submitted = true;
   }
 
   @Override
@@ -245,7 +100,7 @@ public class YARNBSPJob extends BSPJob {
 
     GetApplicationReportRequest reportRequest = Records
         .newRecord(GetApplicationReportRequest.class);
-    reportRequest.setApplicationId(id);
+    reportRequest.setApplicationId(submitClient.getId());
 
     GetApplicationReportResponse reportResponse = applicationsManager
         .getApplicationReport(reportRequest);
@@ -256,7 +111,8 @@ public class YARNBSPJob extends BSPJob {
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.FAILED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.KILLED
         && localReport.getFinalApplicationStatus() != FinalApplicationStatus.SUCCEEDED) {
-      LOG.debug("currently in state: " + localReport.getFinalApplicationStatus());
+      LOG.debug("currently in state: "
+          + localReport.getFinalApplicationStatus());
       if (verbose) {
         long remoteSuperStep = client.getCurrentSuperStep().get();
         if (clientSuperStep > remoteSuperStep) {
@@ -281,42 +137,8 @@ public class YARNBSPJob extends BSPJob {
 
   }
 
-  /*
-   * THESE FOLLOWING METHODS WILL BE IMPLEMENTED IN BSPJOBCLIENT SOON.
-   */
-
-  static String getUnixUserName() throws IOException {
-    String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
-    if (result.length != 1) {
-      throw new IOException("Expect one token as the result of "
-          + Shell.USER_NAME_COMMAND + ": " + toString(result));
-    }
-    return result[0];
-  }
-
-  private static String toString(String[] strArray) {
-    if (strArray == null || strArray.length == 0) {
-      return "";
-    }
-    StringBuilder buf = new StringBuilder(strArray[0]);
-    for (int i = 1; i < strArray.length; i++) {
-      buf.append(' ');
-      buf.append(strArray[i]);
-    }
-    return buf.toString();
-  }
-
-  private static String[] executeShellCommand(String[] command)
-      throws IOException {
-    String groups = Shell.execCommand(command);
-    StringTokenizer tokenizer = new StringTokenizer(groups);
-    int numOfTokens = tokenizer.countTokens();
-    String[] tokens = new String[numOfTokens];
-    for (int i = 0; tokenizer.hasMoreTokens(); i++) {
-      tokens[i] = tokenizer.nextToken();
-    }
-
-    return tokens;
+  ClientRMProtocol getApplicationsManager() {
+    return applicationsManager;
   }
 
 }

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1214138&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Wed Dec 14 10:13:15 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.HamaConfiguration;
+
+public class YARNBSPJobClient extends BSPJobClient {
+
+  private static final Log LOG = LogFactory.getLog(YARNBSPJobClient.class);
+
+  private ApplicationId id;
+  private ApplicationReport report;
+
+  public YARNBSPJobClient(HamaConfiguration conf) {
+    setConf(conf);
+  }
+
+  @Override
+  protected RunningJob launchJob(BSPJobID jobId, BSPJob normalJob,
+      Path submitJobFile, FileSystem fs) throws IOException {
+
+    YARNBSPJob job = (YARNBSPJob) normalJob;
+
+    LOG.info("Submitting job...");
+    if (getConf().get("bsp.child.mem.in.mb") == null) {
+      LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values.");
+    }
+
+    if (fs == null) {
+      fs = FileSystem.get(getConf());
+    }
+
+    if (getConf().get("bsp.user.name") == null) {
+      String s = getUnixUserName();
+      getConf().set("bsp.user.name", s);
+      LOG.info("Retrieved username: " + s);
+    }
+
+    GetNewApplicationRequest request = Records
+        .newRecord(GetNewApplicationRequest.class);
+    GetNewApplicationResponse response = job.getApplicationsManager()
+        .getNewApplication(request);
+    id = response.getApplicationId();
+    LOG.info("Got new ApplicationId=" + id);
+
+    // Create a new ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the ApplicationId
+    appContext.setApplicationId(this.id);
+    // set the application name
+    appContext.setApplicationName(job.getJobName());
+
+    // Create a new container launch context for the AM's container
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+
+    // Define the local resources required
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    // Lets assume the jar we need for our ApplicationMaster is available in
+    // HDFS at a certain known path to us and we want to make it available to
+    // the ApplicationMaster in the launched container
+    if (job.getJar() == null) {
+      throw new IllegalArgumentException(
+          "Jar must be set in order to run the application!");
+    }
+    Path jarPath = new Path(job.getWorkingDirectory(), id + "/app.jar");
+    fs.copyFromLocalFile(job.getLocalPath(job.getJar()), jarPath);
+    LOG.info("Copying app jar to " + jarPath);
+    getConf()
+        .set(
+            "bsp.jar",
+            jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+                .toString());
+    FileStatus jarStatus = fs.getFileStatus(jarPath);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+    amJarRsrc.setType(LocalResourceType.FILE);
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+    amJarRsrc.setSize(jarStatus.getLen());
+    // this creates a symlink in the working directory
+    localResources.put("AppMaster.jar", amJarRsrc);
+    // Set the local resources into the launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set up the environment needed for the launch context
+    Map<String, String> env = new HashMap<String, String>();
+    // Assuming our classes or jars are available as local resources in the
+    // working directory from which the command will be run, we need to append
+    // "." to the path.
+    // By default, all the hadoop specific classpaths will already be available
+    // in $CLASSPATH, so we should be careful not to overwrite it.
+    String classPathEnv = "$CLASSPATH:./*:";
+    env.put("CLASSPATH", classPathEnv);
+    amContainer.setEnvironment(env);
+
+    // Construct the command to be executed on the launched container
+    String command = "${JAVA_HOME}"
+        + "/bin/java -cp "
+        + classPathEnv
+        + " "
+        + BSPApplicationMaster.class.getCanonicalName()
+        + " "
+        + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+            .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/stderr";
+
+    LOG.info("Start command: " + command);
+
+    amContainer.setCommands(Collections.singletonList(command));
+
+    Resource capability = Records.newRecord(Resource.class);
+    // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+    // a base usage of 100mb
+    capability.setMemory(3 * job.getNumBspTask()
+        + getConf().getInt("hama.appmaster.memory.mb", 100));
+    LOG.info("Set memory for the application master to "
+        + capability.getMemory() + "mb!");
+    amContainer.setResource(capability);
+
+    // Set the container launch content into the ApplicationSubmissionContext
+    appContext.setAMContainerSpec(amContainer);
+
+    // Create the request to send to the ApplicationsManager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    job.getApplicationsManager().submitApplication(appRequest);
+
+    GetApplicationReportRequest reportRequest = Records
+        .newRecord(GetApplicationReportRequest.class);
+    reportRequest.setApplicationId(id);
+    while (report == null || report.getHost().equals("N/A")) {
+      GetApplicationReportResponse reportResponse = job
+          .getApplicationsManager().getApplicationReport(reportRequest);
+      report = reportResponse.getApplicationReport();
+      try {
+        Thread.sleep(1000L);
+      } catch (InterruptedException e) {
+        LOG.error(
+            "Got interrupted while waiting for a response report from AM.", e);
+      }
+    }
+    LOG.info("Got report: " + report.getApplicationId() + " "
+        + report.getHost());
+    return new NetworkedJob();
+  }
+
+  @Override
+  public Path getSystemDir() {
+    return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/"));
+  }
+
+  ApplicationId getId() {
+    return id;
+  }
+
+  public ApplicationReport getReport() {
+    return report;
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message