hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1668514 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ yarn/ yarn/src/main/java/org/apache/hama/bsp/
Date Mon, 23 Mar 2015 03:46:03 GMT
Author: edwardyoon
Date: Mon Mar 23 03:46:03 2015
New Revision: 1668514

URL: http://svn.apache.org/r1668514
Log:
HAMA-931: Make the HAMA base path configurable

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/pom.xml
    hama/trunk/yarn/pom.xml
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Mar 23 03:46:03 2015
@@ -11,6 +11,7 @@ Release 0.7.0 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-931: Make the HAMA base path configurable (Minho Kim via edwardyoon)
    HAMA-930: Add hama-yarn to binary distribution (Minho Kim via edwardyoon)  
    HAMA-848: Refactor YARN module for hadoop 2.x stable version (Minho Kim via edwardyoon)
    HAMA-906: Automatic activation of halted vertices without received messages (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Mar 23 03:46:03
2015
@@ -303,21 +303,16 @@ public class BSPJobClient extends Config
     BSPJob job = pJob;
     job.setJobID(jobId);
 
-    int maxTasks;
-    if (job.getConfiguration().getBoolean("hama.yarn.application", false)) {
-      int maxMem = job.getConfiguration().getInt("yarn.nodemanager.resource.memory-mb", 0);
-      int minAllocationMem = job.getConfiguration().getInt("yarn.scheduler.minimum-allocation-mb",
1024);
-      maxTasks = maxMem / minAllocationMem;
-    } else {
-      ClusterStatus clusterStatus = getClusterStatus(true);
-      maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
-          clusterStatus.getMaxTasks() - clusterStatus.getTasks());
-
-      if (maxTasks < job.getNumBspTask()) {
-        LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will
run with "
-            + maxTasks + " tasks.");
-        job.setNumBspTask(maxTasks);
-      }
+    int maxTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB,
+        job.getNumBspTask());
+
+    ClusterStatus clusterStatus = getClusterStatus(true);
+    // Re-adjust the maxTasks based on cluster status.
+    if (clusterStatus != null
+        && maxTasks > (clusterStatus.getMaxTasks() - clusterStatus.getTasks()))
{
+      LOG.warn("The configured number of tasks has exceeded the maximum allowed. Job will
run with "
+          + (clusterStatus.getMaxTasks() - clusterStatus.getTasks()) + " tasks.");
+      job.setNumBspTask(clusterStatus.getMaxTasks() - clusterStatus.getTasks());
     }
 
     Path submitJobDir = new Path(getSystemDir(), "submit_"
@@ -794,7 +789,8 @@ public class BSPJobClient extends Config
    * @throws IOException
    */
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
-    return jobSubmitClient.getClusterStatus(detailed);
+    return (jobSubmitClient != null) ? jobSubmitClient
+        .getClusterStatus(detailed) : null;
   }
 
   // for the testcase

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Mar 23 03:46:03 2015
@@ -212,6 +212,11 @@
           <artifactId>commons-io</artifactId>
           <version>${commons-io.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-client</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
     </dependencies>
     </profile>
   </profiles>

Modified: hama/trunk/yarn/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/pom.xml?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/pom.xml (original)
+++ hama/trunk/yarn/pom.xml Mon Mar 23 03:46:03 2015
@@ -70,7 +70,6 @@
       <artifactId>hadoop-yarn-client</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
-
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
@@ -80,11 +79,6 @@
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
   </dependencies>
 
   <build>

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Mon Mar 23
03:46:03 2015
@@ -59,7 +59,7 @@ import org.apache.hadoop.yarn.util.Recor
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Job.JobState;
-import org.apache.hama.bsp.sync.SyncServerRunner;
+import org.apache.hama.bsp.sync.SyncServer;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.RPC;
@@ -101,7 +101,8 @@ public class BSPApplicationMaster implem
   private Server taskServer;
 
   private volatile long superstep;
-  private SyncServerRunner syncServer;
+  //private SyncServerRunner syncServer;
+  private SyncServer syncServer;
 
   private Counters globalCounter = new Counters();
 
@@ -114,8 +115,11 @@ public class BSPApplicationMaster implem
     }
 
     this.jobFile = args[0];
-    this.localConf = new YarnConfiguration();
+
     this.jobConf = getSubmitConfiguration(jobFile);
+
+    this.localConf = new YarnConfiguration();
+    localConf.addResource(localConf);
     fs = FileSystem.get(jobConf);
 
     this.applicationName = jobConf.get("bsp.job.name",
@@ -192,9 +196,28 @@ public class BSPApplicationMaster implem
    * @throws IOException
    */
   private void startSyncServer() throws Exception {
-    syncServer = SyncServiceFactory.getSyncServerRunner(jobConf);
-    jobConf = syncServer.init(jobConf);
-    threadPool.submit(syncServer);
+    syncServer = SyncServiceFactory.getSyncServer(jobConf);
+    syncServer.init(jobConf);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    threadPool.submit(serverThread);
+  }
+
+  private static class ZKServerThread implements Runnable {
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running SyncServer.", e);
+      }
+    }
   }
 
   /**
@@ -282,11 +305,14 @@ public class BSPApplicationMaster implem
         .getApplicationAttemptId();
   }
 
-  private void start() throws Exception {
+  private void start() throws IOException, YarnException /*throws Exception*/ {
     JobState finalState = null;
     try {
       job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
       finalState = job.startJob();
+    } catch (Exception e) {
+      LOG.error("error was occured. cleaning up");
+      e.printStackTrace();
     } finally {
       if (finalState != null) {
         LOG.info("Job \"" + applicationName + "\"'s state after completion: "
@@ -294,12 +320,14 @@ public class BSPApplicationMaster implem
         LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
             + "s to finish!");
       }
+      LOG.info("job is cleaning up");
       job.cleanup();
     }
   }
 
   private void cleanup() throws YarnException, IOException {
-    syncServer.stop();
+    //syncServer.stop();
+    syncServer.stopServer();
 
     if (threadPool != null && !threadPool.isShutdown()) {
       threadPool.shutdownNow();

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Mon Mar 23 03:46:03
2015
@@ -26,7 +26,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.*;
@@ -101,11 +103,15 @@ public class BSPTaskLauncher {
     ContainerStatus lastStatus = null;
     GetContainerStatusesResponse getContainerStatusesResponse = cm.getContainerStatuses(statusRequest);
     List<ContainerStatus> containerStatuses = getContainerStatusesResponse.getContainerStatuses();
+    if (containerStatuses.size() <= 0) {
+      LOG.info("container Statuses size is zero");
+      return null;
+    }
+
     for (ContainerStatus containerStatus : containerStatuses) {
-      LOG.info("Got container status for containerID="
-          + containerStatus.getContainerId() + ", state="
-          + containerStatus.getState() + ", exitStatus="
-          + containerStatus.getExitStatus() + ", diagnostics="
+      LOG.info("Got container status for containerID=" + containerStatus
+          .getContainerId() + ", state=" + containerStatus.getState()
+          + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
           + containerStatus.getDiagnostics());
 
       if (containerStatus.getContainerId().equals(allocatedContainer.getId())) {
@@ -113,12 +119,14 @@ public class BSPTaskLauncher {
         break;
       }
     }
+
     if (lastStatus.getState() != ContainerState.COMPLETE) {
+      LOG.info("Not completed...");
       return null;
     }
-    LOG.info(this.id + " Last report comes with exitstatus of "
-        + lastStatus.getExitStatus() + " and diagnose string of "
-        + lastStatus.getDiagnostics());
+    LOG.info(this.id + " Last report comes with exitstatus of " + lastStatus
+        .getExitStatus() + " and diagnose string of " + lastStatus
+        .getDiagnostics());
 
     return new BSPTaskStatus(id, lastStatus.getExitStatus());
   }
@@ -154,19 +162,22 @@ public class BSPTaskLauncher {
 
     localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
 
-    Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_RELEASE_LOCATION));
+    Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
     URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
         .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
     LOG.info("Hama release URL has been composed to " + hamaReleaseUrl.toString());
 
-    LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
-    hamaReleaseRsrc.setResource(hamaReleaseUrl);
-    hamaReleaseRsrc.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_SIZE)));
-    hamaReleaseRsrc.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP)));
-    hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
-    hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
+        hamaReleaseFile, true);
 
-    localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+    while(fileStatusListIterator.hasNext()) {
+      LocatedFileStatus lfs = fileStatusListIterator.next();
+      LocalResource localRsrc = LocalResource.newInstance(
+          ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
+          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+          lfs.getLen(), lfs.getModificationTime());
+      localResources.put(lfs.getPath().getName(), localRsrc);
+    }
 
     ctx.setLocalResources(localResources);
 
@@ -187,13 +198,6 @@ public class BSPTaskLauncher {
       classPathEnv.append(c.trim());
     }
 
-    classPathEnv.append(File.pathSeparator);
-    classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
-        "/" + YARNBSPConstants.HAMA_RELEASE_VERSION +  "/*");
-    classPathEnv.append(File.pathSeparator);
-    classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK +
-        "/" + YARNBSPConstants.HAMA_RELEASE_VERSION + "/lib/*");
-
     Vector<CharSequence> vargs = new Vector<CharSequence>();
     vargs.add("${JAVA_HOME}/bin/java");
     vargs.add("-cp " + classPathEnv + "");

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Mon Mar 23 03:46:03 2015
@@ -196,34 +196,27 @@ public class JobImpl implements Job {
     state = JobState.RUNNING;
     int completed = 0;
 
-    List<Integer> cleanupTasks = new ArrayList<Integer>();
     while (completed != numBSPTasks) {
       for (BSPTaskLauncher task : completionQueue) {
         BSPTaskStatus returnedTask = task.poll();
-        // if our task returned with a finished state
-        if (returnedTask != null) {
-          if (returnedTask.getExitStatus() != 0) {
-            LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
-            cleanupTask(returnedTask.getId());
-            state = JobState.FAILED;
-            return state;
-          } else {
-            LOG.info("Task \"" + returnedTask.getId()
-                + "\" sucessfully finished!");
-            completed++;
-            LOG.info("Waiting for " + (numBSPTasks - completed)
-                + " tasks to finish!");
-          }
-          cleanupTasks.add(returnedTask.getId());
+        if(returnedTask != null && returnedTask.getExitStatus() == 0) {
+          LOG.info("Task \"" + returnedTask.getId()
+              + "\" sucessfully finished!");
+          completed++;
+          LOG.info("Waiting for " + (numBSPTasks - completed)
+              + " tasks to finish!");
+        }
+
+        if(returnedTask != null && returnedTask.getExitStatus() != 0) {
+          LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+          completionQueue.add(task);
+          state = JobState.FAILED;
+          return state;
         }
       }
       Thread.sleep(1000L);
     }
 
-    for (Integer stopId : cleanupTasks) {
-      cleanupTask(stopId);
-    }
-
     state = JobState.SUCCESS;
     return state;
   }
@@ -308,6 +301,7 @@ public class JobImpl implements Job {
   @Override
   public void cleanup() throws YarnException, IOException {
     for (BSPTaskLauncher launcher : completionQueue) {
+      LOG.info("cleanup tasks !!!");
       launcher.stopAndCleanup();
     }
   }

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPConstants.java Mon Mar 23 03:46:03
2015
@@ -42,7 +42,7 @@ public class YARNBSPConstants {
   /**
    * Environment key name pointing to the hama release's location
    */
-  public static final String HAMA_RELEASE_LOCATION = "HAMARELEASELOCATION";
+  public static final String HAMA_LOCATION = "HAMALOCATION";
 
   /**
    * Environment key name denoting the file content length for the hama release.
@@ -61,23 +61,4 @@ public class YARNBSPConstants {
    */
   public static final String APP_MASTER_JAR_PATH = "AppMaster.jar";
 
-  /**
-   * Symbolic link name for hama release archive in container local resource
-   */
-  public static final String HAMA_SYMLINK = "hama";
-
-  /**
-   * Hama release file name
-   */
-  public static final String HAMA_RELEASE_FILE = "hama-0.6.4.tar.gz";
-
-  /**
-   * Hama release version
-   */
-  public static final String HAMA_RELEASE_VERSION = "hama-0.6.4";
-
-  /**
-   * Hama release file source location
-   */
-  public static final String HAMA_SRC_PATH = "/home/hadoop";
 }

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Mon Mar 23 03:46:03
2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 
 public class YARNBSPJob extends BSPJob {
@@ -71,6 +72,10 @@ public class YARNBSPJob extends BSPJob {
 
   @Override
   public void submit() throws IOException, InterruptedException {
+    // If Constants.MAX_TASKS_PER_JOB is null, calculates the max tasks based on resource
status.
+    this.getConfiguration().setInt(Constants.MAX_TASKS_PER_JOB, getMaxTasks());
+    LOG.debug("MaxTasks: " + this.getConfiguration().get(Constants.MAX_TASKS_PER_JOB));
+    
     RunningJob submitJobInternal = submitClient.submitJobInternal(this,
         new BSPJobID("hama_yarn", id++));
 
@@ -80,6 +85,14 @@ public class YARNBSPJob extends BSPJob {
     }
   }
 
+  private int getMaxTasks() {
+    int maxMem = this.getConfiguration().getInt(
+        "yarn.nodemanager.resource.memory-mb", 0);
+    int minAllocationMem = this.getConfiguration().getInt(
+        "yarn.scheduler.minimum-allocation-mb", 1024);
+    return maxMem / minAllocationMem;
+  }
+
   @Override
   public boolean waitForCompletion(boolean verbose) throws IOException,
       InterruptedException, ClassNotFoundException {

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Mon Mar 23 03:46:03
2015
@@ -17,23 +17,18 @@
  */
 package org.apache.hama.bsp;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.commons.beanutils.Converter;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -44,10 +39,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 
 public class YARNBSPJobClient extends BSPJobClient {
@@ -233,23 +226,13 @@ public class YARNBSPJobClient extends BS
       // this creates a symlink in the working directory
       localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, amJarRsrc);
 
-      // Copy from hama-${version}.tar.gz to HDFS
-      Path hamaDstPath = new Path(getSystemDir(), YARNBSPConstants.HAMA_RELEASE_FILE);
-      hamaDstPath = fs.makeQualified(hamaDstPath);
-      fs.copyFromLocalFile(false, true,
-          new Path(YARNBSPConstants.HAMA_SRC_PATH, YARNBSPConstants.HAMA_RELEASE_FILE),
-          hamaDstPath);
-      FileStatus hamaStatus = fs.getFileStatus(hamaDstPath);
-      URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaDstPath
-          .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
-      LocalResource hamaReleaseRsrc = Records.newRecord(LocalResource.class);
-      hamaReleaseRsrc.setResource(hamaReleaseUrl);
-      hamaReleaseRsrc.setSize(hamaStatus.getLen());
-      hamaReleaseRsrc.setTimestamp(hamaStatus.getModificationTime());
-      hamaReleaseRsrc.setType(LocalResourceType.ARCHIVE);
-      hamaReleaseRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-
-      localResources.put(YARNBSPConstants.HAMA_SYMLINK, hamaReleaseRsrc);
+      // add hama related jar files to localresources for container
+      List<File> hamaJars = localJarfromPath(System.getProperty("hama.home.dir"));
+      String hamaPath = getSystemDir() + "/hama";
+      for (File fileEntry : hamaJars) {
+        addToLocalResources(fs, fileEntry.getCanonicalPath(),
+            hamaPath, fileEntry.getName(), localResources);
+      }
 
       // Set the local resources into the launch context
       amContainer.setLocalResources(localResources);
@@ -270,16 +253,12 @@ public class YARNBSPJobClient extends BS
         classPathEnv.append(File.pathSeparatorChar);
         classPathEnv.append(c.trim());
       }
-      classPathEnv.append(File.pathSeparator);
-      classPathEnv.append("./" + YARNBSPConstants.HAMA_SYMLINK + "/hama-0.6.4/*");
 
       env.put(YARNBSPConstants.HAMA_YARN_LOCATION, jarPath.toUri().toString());
       env.put(YARNBSPConstants.HAMA_YARN_SIZE, Long.toString(jarStatus.getLen()));
       env.put(YARNBSPConstants.HAMA_YARN_TIMESTAMP, Long.toString(jarStatus.getModificationTime()));
 
-      env.put(YARNBSPConstants.HAMA_RELEASE_LOCATION, hamaDstPath.toUri().toString());
-      env.put(YARNBSPConstants.HAMA_RELEASE_SIZE, Long.toString(hamaStatus.getLen()));
-      env.put(YARNBSPConstants.HAMA_RELEASE_TIMESTAMP, Long.toString(hamaStatus.getModificationTime()));
+      env.put(YARNBSPConstants.HAMA_LOCATION, hamaPath);
       env.put("CLASSPATH", classPathEnv.toString());
       amContainer.setEnvironment(env);
 
@@ -436,4 +415,28 @@ public class YARNBSPJobClient extends BS
     // throws an exception in case of failures
     yarnClient.killApplication(appId);
   }
+
+  private List<File> localJarfromPath(String path) throws IOException {
+    File hamaHome = new File(path);
+    String[] extensions = new String[]{"jar"};
+    List<File> files = (List<File>)FileUtils.listFiles(hamaHome, extensions,
true);
+
+    return files;
+  }
+
+
+  private void addToLocalResources(FileSystem fs, String fileSrcPath,
+      String fileDstPath, String fileName, Map<String, LocalResource> localResources)
+      throws IOException {
+    Path dstPath = new Path(fileDstPath, fileName);
+    dstPath = fs.makeQualified(dstPath);
+    fs.copyFromLocalFile(false, true, new Path(fileSrcPath), dstPath);
+    FileStatus fileStatus = fs.getFileStatus(dstPath);
+    LocalResource localRsrc =
+        LocalResource.newInstance(
+            ConverterUtils.getYarnUrlFromURI(dstPath.toUri()),
+            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+            fileStatus.getLen(), fileStatus.getModificationTime());
+    localResources.put(fileName, localRsrc);
+  }
 }

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1668514&r1=1668513&r2=1668514&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java Mon Mar 23
03:46:03 2015
@@ -57,12 +57,9 @@ public class YarnSerializePrinting {
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException {
     HamaConfiguration conf = new HamaConfiguration();
-    // TODO some keys that should be within a conf
-    conf.set("bsp.user.name", "hama");
-    conf.setInt(Constants.MAX_TASKS, 10);
 
     YARNBSPJob job = new YARNBSPJob(conf);
-    job.setBoolean("hama.yarn.application", true);
+    System.out.println(conf.get("bsp.user.name"));
     job.setBspClass(HelloBSP.class);
     job.setJarByClass(HelloBSP.class);
     job.setJobName("Serialize Printing");



Mime
View raw message