incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1183352 [1/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/checkpoint/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/checkpoint/ yarn/ yarn/src/ yarn...
Date Fri, 14 Oct 2011 13:29:12 GMT
Author: tjungblut
Date: Fri Oct 14 13:29:10 2011
New Revision: 1183352

URL: http://svn.apache.org/viewvc?rev=1183352&view=rev
Log:
[HAMA-431] integration of the branch for YARN.


Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
      - copied, changed from r1182784, incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
      - copied, changed from r1182784, incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/yarn/
    incubator/hama/trunk/yarn/pom.xml   (with props)
    incubator/hama/trunk/yarn/src/
    incubator/hama/trunk/yarn/src/main/
    incubator/hama/trunk/yarn/src/main/java/
    incubator/hama/trunk/yarn/src/main/java/org/
    incubator/hama/trunk/yarn/src/main/java/org/apache/
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPPeerImpl.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/StringArrayWritable.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServer.java   (with props)
    incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/sync/SyncServerImpl.java   (with props)
    incubator/hama/trunk/yarn/src/main/resources/
    incubator/hama/trunk/yarn/src/main/resources/log4j.properties   (with props)
Removed:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
    incubator/hama/trunk/pom.xml

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Oct 14 13:29:10 2011
@@ -3,7 +3,7 @@ Hama Change Log
 Release 0.4 - Unreleased
 
   NEW FEATURES
-
+   HAMA-431: MapReduce NG integration (tjungblut)
    HAMA-449: Add tasks num of Job to web UI (edwardyoon)
    HAMA-428: Create a separate maven module and add basic structure for the Graph (edwardyoon)
    HAMA-398: Add CheckPointer and saving messages for future fault-tolerant systems (ChiaHung Lin via edwardyoon)

Copied: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (from r1182784, incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java)
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?p2=incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java&p1=incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java&r1=1182784&r2=1183352&rev=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Fri Oct 14 13:29:10 2011
@@ -20,6 +20,7 @@ package org.apache.hama.bsp;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.zookeeper.KeeperException;
@@ -27,8 +28,7 @@ import org.apache.zookeeper.KeeperExcept
 /**
  * BSP communication interface.
  */
-public interface BSPPeerInterface extends HamaRPCProtocolVersion, Closeable,
-    Constants {
+public interface BSPPeer extends HamaRPCProtocolVersion, Closeable, Constants {
 
   /**
    * Send a data with a tag to another BSPSlave corresponding to hostname.
@@ -99,4 +99,9 @@ public interface BSPPeerInterface extend
    * Clears all queues entries.
    */
   public void clear();
+
+  /**
+   * @return The jobs configuration
+   */
+  public Configuration getConfiguration();
 }

Copied: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (from r1182784, incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java)
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?p2=incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java&p1=incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java&r1=1182784&r2=1183352&rev=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Oct 14 13:29:10 2011
@@ -59,9 +59,9 @@ import org.apache.zookeeper.data.Stat;
 /**
  * This class represents a BSP peer.
  */
-public class BSPPeer implements Watcher, BSPPeerInterface {
+public class BSPPeerImpl implements Watcher, BSPPeer {
 
-  public static final Log LOG = LogFactory.getLog(BSPPeer.class);
+  public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   private final Configuration conf;
   private BSPJob jobConf;
@@ -73,7 +73,7 @@ public class BSPPeer implements Watcher,
   private final String bspRoot;
   private final String quorumServers;
 
-  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, BSPPeer> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer>();
   private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
   private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
   private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
@@ -192,7 +192,7 @@ public class BSPPeer implements Watcher,
   /**
    * Protected default constructor for LocalBSPRunner.
    */
-  protected BSPPeer() {
+  protected BSPPeerImpl() {
     bspRoot = null;
     quorumServers = null;
     messageSerializer = null;
@@ -208,7 +208,7 @@ public class BSPPeer implements Watcher,
    * @param umbilical is the bsp protocol used to contact its parent process.
    * @param taskid is the id that current process holds.
    */
-  public BSPPeer(Configuration conf, TaskAttemptID taskid,
+  public BSPPeerImpl(Configuration conf, TaskAttemptID taskid,
       BSPPeerProtocol umbilical) throws IOException {
     this.conf = conf;
     this.taskid = taskid;
@@ -312,7 +312,7 @@ public class BSPPeer implements Watcher,
       Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
           .next();
 
-      BSPPeerInterface peer = peers.get(entry.getKey());
+      BSPPeer peer = peers.get(entry.getKey());
       if (peer == null) {
         try {
           peer = getBSPPeerConnection(entry.getKey());
@@ -587,19 +587,19 @@ public class BSPPeer implements Watcher,
 
   @Override
   public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return BSPPeerInterface.versionID;
+    return BSPPeer.versionID;
   }
 
-  protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr)
+  protected BSPPeer getBSPPeerConnection(InetSocketAddress addr)
       throws NullPointerException, IOException {
-    BSPPeerInterface peer;
+    BSPPeer peer;
     synchronized (this.peers) {
       peer = peers.get(addr);
 
       int retries = 0;
       while (peer != null) {
-        peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class,
-            BSPPeerInterface.versionID, addr, this.conf);
+        peer = (BSPPeer) RPC.getProxy(BSPPeer.class,
+            BSPPeer.versionID, addr, this.conf);
 
         retries++;
         if (retries > 10) {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Fri Oct 14 13:29:10 2011
@@ -51,7 +51,7 @@ public class BSPTask extends Task {
   }
 
   @Override
-  public void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+  public void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical)
       throws IOException {
 
     BSP bsp = (BSP) ReflectionUtils.newInstance(

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Fri Oct 14 13:29:10 2011
@@ -944,7 +944,7 @@ public class GroomServer implements Runn
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
 
-      BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+      BSPPeerImpl bspPeer = new BSPPeerImpl(defaultConf, taskid, umbilical);
       bspPeer.reinitialize();
       bspPeer.setJobConf(job);
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Oct 14 13:29:10 2011
@@ -253,7 +253,7 @@ public class LocalBSPRunner implements J
 
   }
 
-  class LocalGroom extends BSPPeer {
+  class LocalGroom extends BSPPeerImpl {
     private long superStepCount = 0;
     private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
     // outgoing queue

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/Task.java Fri Oct 14 13:29:10 2011
@@ -125,7 +125,7 @@ public abstract class Task implements Wr
    * @param bspPeer for communications
    * @param umbilical for communications with GroomServer
    */
-  public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+  public abstract void run(BSPJob job, BSPPeerImpl bspPeer, BSPPeerProtocol umbilical)
       throws IOException;
 
   public abstract BSPTaskRunner createRunner(GroomServer groom);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Fri Oct 14 13:29:10 2011
@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
 import org.apache.hama.GroomServerRunner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Fri Oct 14 13:29:10 2011
@@ -20,14 +20,14 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
 
 public final class BSPSerializerWrapper {
 
-  private final BSPPeer.BSPMessageSerializer serializer;
+  private final BSPPeerImpl.BSPMessageSerializer serializer;
 
   public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
-    this.serializer = new BSPPeer(conf, null, null).new BSPMessageSerializer(
+    this.serializer = new BSPPeerImpl(conf, null, null).new BSPMessageSerializer(
       conf.getInt("bsp.checkpoint.port", port)); 
   }  
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Fri Oct 14 13:29:10 2011
@@ -33,7 +33,7 @@ import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPSerializerWrapper;
 import org.apache.hama.bsp.DoubleMessage;
-import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPPeerImpl.BSPSerializableMessage;
 
 public class TestCheckpoint extends TestCase {
 

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1183352&r1=1183351&r2=1183352&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Fri Oct 14 13:29:10 2011
@@ -187,6 +187,7 @@
     <module>core</module>
     <module>graph</module>
     <module>examples</module>
+    <module>yarn</module>
   </modules>
 
   <build>

Added: incubator/hama/trunk/yarn/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/pom.xml?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/pom.xml (added)
+++ incubator/hama/trunk/yarn/pom.xml Fri Oct 14 13:29:10 2011
@@ -0,0 +1,128 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <parent>
+    <groupId>org.apache.hama</groupId>
+    <artifactId>hama-parent</artifactId>
+    <version>0.4.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hama</groupId>
+  <artifactId>hama-yarn</artifactId>
+  <name>Apache Hama YARN</name>
+  <version>0.4.0-incubating-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <scm>
+    <connection>
+      scm:svn:http://svn.apache.org/repos/asf/incubator/hama/server
+    </connection>
+    <developerConnection>
+      scm:svn:https://svn.apache.org/repos/asf/incubator/hama/server
+    </developerConnection>
+  </scm>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>2.4.0a</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.5.3</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>0.23.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <version>0.23.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>0.23.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${basedir}/target/lib</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+              <excludeTransitive>true</excludeTransitive>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>1.4</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+    <finalName>hama-yarn-${project.version}</finalName>
+  </build>
+</project>

Propchange: incubator/hama/trunk/yarn/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Job.JobState;
+import org.apache.hama.bsp.sync.SyncServer;
+import org.apache.hama.bsp.sync.SyncServerImpl;
+import org.apache.mina.util.AvailablePortFinder;
+
+/**
+ * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
+ */
+public class BSPApplicationMaster implements BSPClient {
+
+  private static final Log LOG = LogFactory.getLog(BSPApplicationMaster.class);
+  private static final ExecutorService threadPool = Executors
+      .newFixedThreadPool(1);
+
+  private Configuration localConf;
+  private Configuration jobConf;
+  private String jobFile;
+
+  private Clock clock;
+  private YarnRPC yarnRPC;
+  private AMRMProtocol amrmRPC;
+
+  private ApplicationAttemptId appAttemptId;
+  private String applicationName;
+  private long startTime;
+
+  private JobImpl job;
+  private BSPJobID jobId;
+
+  private SyncServerImpl syncServer;
+  private Future<Long> syncServerFuture;
+
+  // RPC info where the AM receive client side requests
+  private String hostname;
+  private int clientPort;
+
+  private Server clientServer;
+
+  private BSPApplicationMaster(String[] args) throws IOException {
+    if (args.length != 1) {
+      throw new IllegalArgumentException();
+    }
+
+    jobFile = args[0];
+    localConf = new YarnConfiguration();
+    jobConf = getSubmitConfiguration(jobFile);
+
+    applicationName = jobConf.get("bsp.job.name", "<no bsp job name defined>");
+    if (applicationName.isEmpty()) {
+      applicationName = "<no bsp job name defined>";
+    }
+
+    appAttemptId = getApplicationAttemptId();
+
+    yarnRPC = YarnRPC.create(localConf);
+    clock = new SystemClock();
+    startTime = clock.getTime();
+
+    jobId = new BSPJobID(appAttemptId.toString(), 0);
+
+    // TODO this is not localhost, is it?
+    hostname = InetAddress.getLocalHost().getCanonicalHostName();
+    startSyncServer();
+    clientPort = getFreePort();
+    // TODO should have a configurable amount of RPC handlers
+    this.clientServer = RPC.getServer(this, hostname, clientPort, 10, false,
+        jobConf);
+
+    /*
+     * Make sure that this executes after the start of the sync server, because
+     * we are readjusting the configuration.
+     */
+    rewriteSubmitConfiguration(jobFile, jobConf);
+
+    amrmRPC = getYarnRPCConnection(localConf);
+    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
+  }
+
+  /**
+   * This method starts the sync server on a specific port and waits for it to
+   * come up. Be aware that this method adds the "bsp.sync.server.address" that
+   * is needed for a task to connect to the service.
+   * 
+   * @throws IOException
+   */
+  private void startSyncServer() throws IOException {
+    int syncPort = getFreePort(15000);
+    syncServer = new SyncServerImpl(jobConf.getInt("bsp.peers.num", 1),
+        hostname, syncPort);
+    syncServerFuture = threadPool.submit(syncServer);
+    // wait for the RPC to come up
+    InetSocketAddress syncAddress = NetUtils.createSocketAddr(hostname + ":"
+        + syncPort);
+    LOG.info("Waiting for the Sync Master at " + syncAddress);
+    RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
+        jobConf);
+    jobConf.set("hama.sync.server.address", hostname + ":" + syncPort);
+  }
+
+  /**
+   * Connects to the Resource Manager.
+   * 
+   * @param yarnConf
+   * @return a new RPC connection to the Resource Manager.
+   */
+  private AMRMProtocol getYarnRPCConnection(Configuration yarnConf) {
+    // Connect to the Scheduler of the ResourceManager.
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    return (AMRMProtocol) yarnRPC.getProxy(AMRMProtocol.class, rmAddress,
+        yarnConf);
+  }
+
+  /**
+   * Registers this application master with the Resource Manager and retrieves a
+   * response which is used to launch additional containers.
+   * 
+   * @throws YarnRemoteException
+   */
+  private RegisterApplicationMasterResponse registerApplicationMaster(
+      AMRMProtocol resourceManager, ApplicationAttemptId appAttemptID,
+      String appMasterHostName, int appMasterRpcPort,
+      String appMasterTrackingUrl) throws YarnRemoteException {
+
+    RegisterApplicationMasterRequest appMasterRequest = Records
+        .newRecord(RegisterApplicationMasterRequest.class);
+    appMasterRequest.setApplicationAttemptId(appAttemptID);
+    appMasterRequest.setHost(appMasterHostName);
+    appMasterRequest.setRpcPort(appMasterRpcPort);
+    // TODO tracking URL
+    // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+    RegisterApplicationMasterResponse response = resourceManager
+        .registerApplicationMaster(appMasterRequest);
+    LOG.debug("ApplicationMaster has maximum resource capability of: "
+        + response.getMaximumResourceCapability().getMemory());
+    return response;
+  }
+
+  /**
+   * Gets the application attempt ID from the environment. This should be set by
+   * YARN when the container has been launched.
+   * 
+   * @return a new ApplicationAttemptId which is unique and identifies this
+   *         task.
+   */
+  private ApplicationAttemptId getApplicationAttemptId() throws IOException {
+    Map<String, String> envs = System.getenv();
+    if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
+      throw new IllegalArgumentException(
+          "ApplicationAttemptId not set in the environment");
+    }
+    return ConverterUtils.toApplicationAttemptId(envs
+        .get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
+  }
+
+  private void start() throws Exception {
+    JobState finalState = null;
+    try {
+      job = new JobImpl(appAttemptId, jobConf, yarnRPC, amrmRPC, jobFile, jobId);
+      finalState = job.startJob();
+    } finally {
+      if (this.syncServer != null) {
+        this.syncServer.stopServer();
+      }
+      if (finalState != null) {
+        LOG.info("Job \"" + applicationName + "\"'s state after completion: "
+            + finalState.toString());
+        LOG.info("Made " + (syncServerFuture.get() - 1L) + " supersteps!");
+        LOG.info("Job took " + ((clock.getTime() - startTime) / 1000L)
+            + "s to finish!");
+      }
+      job.cleanup();
+    }
+  }
+
+  private void cleanup() throws YarnRemoteException {
+    if (this.syncServer != null) {
+      this.syncServer.stopServer();
+    }
+    FinishApplicationMasterRequest finishReq = Records
+        .newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setAppAttemptId(appAttemptId);
+    switch (job.getState()) {
+      case SUCCESS:
+        finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+        break;
+      case KILLED:
+        finishReq.setFinishApplicationStatus(FinalApplicationStatus.KILLED);
+        break;
+      case FAILED:
+        finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+        break;
+      default:
+        finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+    }
+    this.amrmRPC.finishApplicationMaster(finishReq);
+  }
+
+  public static void main(String[] args) throws YarnRemoteException {
+    // TODO we expect getting the qualified path of the job.xml as the first
+    // element in the arguments
+    BSPApplicationMaster master = null;
+    try {
+      master = new BSPApplicationMaster(args);
+      master.start();
+    } catch (Exception e) {
+      LOG.fatal("Error starting BSPApplicationMaster", e);
+    } finally {
+      if (master != null) {
+        master.cleanup();
+      }
+    }
+  }
+
+  /*
+   * Some utility methods
+   */
+
+  /**
+   * Reads the configuration from the given path.
+   */
+  private Configuration getSubmitConfiguration(String path) {
+    Path jobSubmitPath = new Path(path);
+    Configuration jobConf = new HamaConfiguration();
+    jobConf.addResource(jobSubmitPath);
+    return jobConf;
+  }
+
+  /**
+   * Writes the current configuration to a given path to reflect changes. For
+   * example the sync server address is put after the file has been written.
+   * TODO this should upload to HDFS to a given path as well.
+   * 
+   * @throws IOException
+   */
+  private void rewriteSubmitConfiguration(String path, Configuration conf)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(jobSubmitPath);
+    conf.writeXml(out);
+    out.close();
+    LOG.info("Written new configuration back to " + path);
+  }
+
+  /**
+   * Uses Minas AvailablePortFinder to find a port, starting at 14000.
+   * 
+   * @return a free port.
+   */
+  private int getFreePort() {
+    int startPort = 14000;
+    return getFreePort(startPort);
+  }
+
+  /**
+   * Uses Minas AvailablePortFinder to find a port, starting at startPort.
+   * 
+   * @return a free port.
+   */
+  private int getFreePort(int startPort) {
+    while (!AvailablePortFinder.available(startPort)) {
+      startPort++;
+      LOG.debug("Testing port for availability: " + startPort);
+    }
+    return startPort;
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return BSPClient.VERSION;
+  }
+
+  @Override
+  public LongWritable getCurrentSuperStep() {
+    return syncServer.getSuperStep();
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    // TODO Auto-generated method stub
+    return new ProtocolSignature();
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface BSPClient extends VersionedProtocol {
+
+  public static final int VERSION = 0;
+  
+  public LongWritable getCurrentSuperStep(); 
+  
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+
+public class BSPRunner {
+
+  private static final Log LOG = LogFactory.getLog(BSPRunner.class);
+
+  private Configuration conf;
+  private TaskAttemptID id;
+  private YARNBSPPeerImpl peer;
+
+  Class<? extends BSP> bspClass;
+
+  @SuppressWarnings("unchecked")
+  public BSPRunner(String jobId, int taskAttemptId, Path confPath)
+      throws IOException, ClassNotFoundException {
+    conf = new HamaConfiguration();
+    conf.addResource(confPath);
+    this.id = new TaskAttemptID(jobId, 0, taskAttemptId, 0);
+    this.id.id = taskAttemptId;
+    peer = new YARNBSPPeerImpl(conf, id);
+    // this is a checked cast because we can only set a class via the BSPJob
+    // class which only allows derivates of BSP.
+    bspClass = (Class<? extends BSP>) conf.getClassByName(conf
+        .get("bsp.work.class"));
+  }
+
+  public void startComputation() throws Exception {
+    BSP bspInstance = ReflectionUtils.newInstance(bspClass, conf);
+    LOG.info("Syncing for the first time to wait for all the tasks to come up...");
+    peer.getSyncService().enterBarrier(id);
+    peer.getSyncService().leaveBarrier(id);
+    LOG.info("Initial sync was successful, now running the computation!");
+    try {
+      bspInstance.setup(peer);
+      bspInstance.bsp(peer);
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      bspInstance.cleanup(peer);
+    }
+  }
+
+  /**
+   * Main entry point after a container has launched.
+   * 
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    LOG.info("Starting task with arguments: " + Arrays.toString(args));
+    if (args.length != 3) {
+      throw new IllegalArgumentException("Expected 3 args given, but found: "
+          + Arrays.toString(args));
+    }
+    // jobid is the first of the args (string)
+    // taskid is the second arg (int)
+    // third arg is the qualified path of the job configuration
+    BSPRunner bspRunner = new BSPRunner(args[0], Integer.valueOf(args[1]),
+        new Path(args[2]));
+    bspRunner.startComputation();
+  }
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
+
+public class BSPTaskLauncher implements Callable<BSPTaskStatus> {
+
+  private static final Log LOG = LogFactory.getLog(BSPTaskLauncher.class);
+
+  private final Container allocatedContainer;
+  private final int id;
+  private final ContainerManager cm;
+  private final Configuration conf;
+  private final String user;
+  private final Path jobFile;
+  private final BSPJobID jobId;
+
+  public BSPTaskLauncher(int id, Container container, ContainerManager cm,
+      Configuration conf, Path jobFile, BSPJobID jobId)
+      throws YarnRemoteException {
+    this.id = id;
+    this.cm = cm;
+    this.conf = conf;
+    this.allocatedContainer = container;
+    this.jobFile = jobFile;
+    this.jobId = jobId;
+    this.user = conf.get("bsp.user.name");
+  }
+
+  @Override
+  protected void finalize() throws Throwable {
+    stopAndCleanup();
+  }
+
+  public void stopAndCleanup() throws YarnRemoteException {
+    StopContainerRequest stopRequest = Records
+        .newRecord(StopContainerRequest.class);
+    stopRequest.setContainerId(allocatedContainer.getId());
+    cm.stopContainer(stopRequest);
+  }
+
+  @Override
+  public BSPTaskStatus call() throws Exception {
+    LOG.info("Spawned task with id: " + this.id
+        + " for allocated container id: "
+        + this.allocatedContainer.getId().toString());
+    final GetContainerStatusRequest statusRequest = setupContainer(
+        allocatedContainer, cm, user, id);
+
+    ContainerStatus lastStatus;
+    while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
+        .getState() != ContainerState.COMPLETE) {
+      Thread.sleep(1000l);
+    }
+
+    return new BSPTaskStatus(id, lastStatus.getExitStatus());
+  }
+
+  private GetContainerStatusRequest setupContainer(
+      Container allocatedContainer, ContainerManager cm, String user, int id)
+      throws IOException {
+    LOG.info("Setting up a container for user " + user + " with id of " + id
+        + " and containerID of " + allocatedContainer.getId());
+    // Now we setup a ContainerLaunchContext
+    ContainerLaunchContext ctx = Records
+        .newRecord(ContainerLaunchContext.class);
+
+    ctx.setContainerId(allocatedContainer.getId());
+    ctx.setResource(allocatedContainer.getResource());
+    ctx.setUser(user);
+
+    /*
+     * jar
+     */
+    LocalResource packageResource = Records.newRecord(LocalResource.class);
+    FileSystem fs = FileSystem.get(conf);
+    Path packageFile = new Path(conf.get("bsp.jar"));
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile);
+
+    FileStatus fileStatus = fs.getFileStatus(packageFile);
+    packageResource.setResource(packageUrl);
+    packageResource.setSize(fileStatus.getLen());
+    packageResource.setTimestamp(fileStatus.getModificationTime());
+    packageResource.setType(LocalResourceType.ARCHIVE);
+    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    LOG.info("Package resource: " + packageResource.getResource());
+
+    ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+    
+    /*
+     * TODO Package classpath seems not to work if you're in pseudo distributed
+     * mode, because the resource must not be moved, it will never be unpacked.
+     * So we will check if our jar file has the file:// prefix and put it into
+     * the CP directly
+     */
+    String cp = "$CLASSPATH:./*:./package/*:./*:";
+    if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) {
+      cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+          .toString() + ":";
+      LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
+    }
+    String[] cmds = {
+        "${JAVA_HOME}" + "/bin/java -cp \"" + cp + "\" "
+            + BSPRunner.class.getCanonicalName(),
+        jobId.getJtIdentifier(),
+        id + "",
+        this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+            .toString(),
+        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
+    ctx.setCommands(Arrays.asList(cmds));
+    LOG.info("Starting command: " + Arrays.toString(cmds));
+
+    StartContainerRequest startReq = Records
+        .newRecord(StartContainerRequest.class);
+    startReq.setContainerLaunchContext(ctx);
+    cm.startContainer(startReq);
+
+    GetContainerStatusRequest statusReq = Records
+        .newRecord(GetContainerStatusRequest.class);
+    statusReq.setContainerId(allocatedContainer.getId());
+    return statusReq;
+  }
+
+  public static class BSPTaskStatus {
+    private final int id;
+    private final int exitStatus;
+
+    public BSPTaskStatus(int id, int exitStatus) {
+      super();
+      this.id = id;
+      this.exitStatus = exitStatus;
+    }
+
+    public int getId() {
+      return id;
+    }
+
+    public int getExitStatus() {
+      return exitStatus;
+    }
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+
+
+/**
+ * Main interface to interact with the job. Provides only getters.
+ */
+public interface Job {
+
+  public enum JobState {
+    NEW, RUNNING, SUCCESS, FAILED, KILLED
+  }
+
+  public enum BSPPhase {
+    COMPUTATION, COMMUNICATION
+  }
+
+  public JobState startJob() throws Exception;
+  
+  public void cleanup() throws YarnRemoteException;
+
+  JobState getState();
+
+  BSPPhase getBSPPhase();
+
+  int getTotalBSPTasks();
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/Job.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.bsp.BSPTaskLauncher.BSPTaskStatus;
+
+public class JobImpl implements Job {
+
+  private static final Log LOG = LogFactory.getLog(JobImpl.class);
+  private static final ExecutorService threadPool = Executors
+      .newCachedThreadPool();
+
+  private static final int DEFAULT_MEMORY_MB = 256;
+
+  private Configuration conf;
+  private BSPJobID jobId;
+  private int numBSPTasks;
+  private int priority = 0;
+  private String childOpts;
+  private int taskMemoryInMb;
+  private Path jobFile;
+
+  private JobState state;
+  private BSPPhase phase;
+
+  private ApplicationAttemptId appAttemptId;
+  private YarnRPC yarnRPC;
+  private AMRMProtocol resourceManager;
+
+  private List<Container> allocatedContainers;
+  private List<ContainerId> releasedContainers = Collections.emptyList();
+
+  private ExecutorCompletionService<BSPTaskStatus> completionService = new ExecutorCompletionService<BSPTaskStatus>(
+      threadPool);
+  private Map<Integer, BSPTaskLauncher> launchers = new HashMap<Integer, BSPTaskLauncher>();
+  private int lastResponseID = 0;
+  private Resource availableResources;
+
+  public JobImpl(ApplicationAttemptId appAttemptId,
+      Configuration jobConfiguration, YarnRPC yarnRPC, AMRMProtocol amrmRPC,
+      String jobFile, BSPJobID jobId) {
+    super();
+    this.numBSPTasks = jobConfiguration.getInt("bsp.peers.num", 1);
+    this.appAttemptId = appAttemptId;
+    this.yarnRPC = yarnRPC;
+    this.resourceManager = amrmRPC;
+    this.jobFile = new Path(jobFile);
+    this.state = JobState.NEW;
+    this.jobId = jobId;
+    this.conf = jobConfiguration;
+    this.childOpts = conf.get("bsp.child.java.opts");
+
+    this.taskMemoryInMb = getMemoryRequirements();
+    LOG.info("Memory per task: " + taskMemoryInMb + "m!");
+  }
+
+  private int getMemoryRequirements() {
+    String newMemoryProperty = conf.get("bsp.child.mem.in.mb");
+    if (newMemoryProperty == null) {
+      LOG.warn("\"bsp.child.mem.in.mb\" was not set! Try parsing the child opts...");
+      return getMemoryFromOptString(childOpts);
+    } else {
+      return Integer.valueOf(newMemoryProperty);
+    }
+  }
+
+  // TODO This really needs a testcase
+  private int getMemoryFromOptString(String opts) {
+    if (!opts.contains("-Xmx")) {
+      LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
+      return DEFAULT_MEMORY_MB;
+    } else {
+      // e.G: -Xmx512m
+      int startIndex = opts.indexOf("-Xmx") + 4;
+      int endIndex = opts.indexOf(" ", startIndex);
+      String xmxString = opts.substring(startIndex, endIndex);
+      char qualifier = xmxString.charAt(xmxString.length() - 1);
+      int memory = Integer.valueOf(xmxString.substring(0,
+          xmxString.length() - 2));
+      if (qualifier == 'm') {
+        return memory;
+      } else if (qualifier == 'g') {
+        return memory * 1024;
+      } else {
+        throw new IllegalArgumentException(
+            "Memory Limit in child opts was not set! \"bsp.child.java.opts\" String was: "
+                + opts);
+      }
+    }
+  }
+
+  @Override
+  public JobState startJob() throws YarnRemoteException, InterruptedException,
+      ExecutionException {
+
+    this.allocatedContainers = new ArrayList<Container>(numBSPTasks);
+    while (allocatedContainers.size() < numBSPTasks) {
+
+      AllocateRequest req = BuilderUtils.newAllocateRequest(
+          appAttemptId,
+          lastResponseID,
+          0.0f,
+          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),
+              taskMemoryInMb, priority), releasedContainers);
+
+      AllocateResponse allocateResponse = resourceManager.allocate(req);
+      AMResponse amResponse = allocateResponse.getAMResponse();
+      LOG.info("Got response! ID: " + amResponse.getResponseId()
+          + " with num of containers: "
+          + amResponse.getAllocatedContainers().size()
+          + " and following resources: "
+          + amResponse.getAvailableResources().getMemory() + "mb");
+      this.lastResponseID = amResponse.getResponseId();
+
+      this.availableResources = amResponse.getAvailableResources();
+      this.allocatedContainers.addAll(amResponse.getAllocatedContainers());
+      LOG.info("Waiting to allocate "
+          + (numBSPTasks - allocatedContainers.size()) + " more containers...");
+      Thread.sleep(1000l);
+    }
+
+    LOG.info("Got " + allocatedContainers.size() + " containers!");
+
+    int launchedBSPTasks = 0;
+
+    int id = 0;
+    for (Container allocatedContainer : allocatedContainers) {
+      LOG.info("Launching task on a new container." + ", containerId="
+          + allocatedContainer.getId() + ", containerNode="
+          + allocatedContainer.getNodeId().getHost() + ":"
+          + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+          + allocatedContainer.getNodeHttpAddress() + ", containerState"
+          + allocatedContainer.getState() + ", containerResourceMemory"
+          + allocatedContainer.getResource().getMemory());
+
+      // Connect to ContainerManager on the allocated container
+      String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"
+          + allocatedContainer.getNodeId().getPort();
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);
+      ContainerManager cm = (ContainerManager) yarnRPC.getProxy(
+          ContainerManager.class, cmAddress, conf);
+
+      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,
+          allocatedContainer, cm, conf, jobFile, jobId);
+      launchers.put(id, runnableLaunchContainer);
+      completionService.submit(runnableLaunchContainer);
+      id++;
+      launchedBSPTasks++;
+    }
+    state = JobState.RUNNING;
+
+    for (int i = 0; i < launchedBSPTasks; i++) {
+      BSPTaskStatus returnedTask = completionService.take().get();
+      if (returnedTask.getExitStatus() != 0) {
+        LOG.error("Task with id \"" + returnedTask.getId() + "\" failed!");
+        state = JobState.FAILED;
+        return state;
+      } else {
+        LOG.info("Task \"" + id + "\" sucessfully finished!");
+      }
+      cleanupTask(returnedTask.getId());
+    }
+
+    state = JobState.SUCCESS;
+    return state;
+  }
+
+  /**
+   * Makes a lookup for the taskid and stops its container and task. It also
+   * removes the task from the launcher so that we won't have to stop it twice.
+   * 
+   * @param id
+   * @throws YarnRemoteException
+   */
+  private void cleanupTask(int id) throws YarnRemoteException {
+    BSPTaskLauncher bspTaskLauncher = launchers.get(id);
+    bspTaskLauncher.stopAndCleanup();
+    launchers.remove(id);
+  }
+
+  @Override
+  public void cleanup() throws YarnRemoteException {
+    for (BSPTaskLauncher launcher : launchers.values()) {
+      launcher.stopAndCleanup();
+    }
+    threadPool.shutdownNow();
+  }
+
+  private List<ResourceRequest> createBSPTaskRequest(int numTasks,
+      int memoryInMb, int priority) {
+
+    List<ResourceRequest> reqList = new ArrayList<ResourceRequest>(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      // Resource Request
+      ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+      // setup requirements for hosts
+      // whether a particular rack/host is needed
+      // useful for applications that are sensitive
+      // to data locality
+      rsrcRequest.setHostName("*");
+
+      // set the priority for the request
+      Priority pri = Records.newRecord(Priority.class);
+      pri.setPriority(priority);
+      rsrcRequest.setPriority(pri);
+
+      // Set up resource type requirements
+      // For now, only memory is supported so we set memory requirements
+      Resource capability = Records.newRecord(Resource.class);
+      capability.setMemory(memoryInMb);
+      rsrcRequest.setCapability(capability);
+
+      // set no. of containers needed
+      // matching the specifications
+      rsrcRequest.setNumContainers(numBSPTasks);
+      reqList.add(rsrcRequest);
+    }
+    return reqList;
+  }
+
+  @Override
+  public JobState getState() {
+    return state;
+  }
+
+  @Override
+  public int getTotalBSPTasks() {
+    return numBSPTasks;
+  }
+
+  @Override
+  public BSPPhase getBSPPhase() {
+    return phase;
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/JobImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java?rev=1183352&view=auto
==============================================================================
--- incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java (added)
+++ incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java Fri Oct 14 13:29:10 2011
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hama.HamaConfiguration;
+
+public class YARNBSPJob extends BSPJob {
+
+  private static final Log LOG = LogFactory.getLog(YARNBSPJob.class);
+
+  private BSPClient client;
+  private YarnRPC rpc;
+  private ApplicationId id;
+  private FileSystem fs;
+
+  private boolean submitted;
+
+  private ApplicationReport report;
+
+  private ClientRMProtocol applicationsManager;
+
+  public YARNBSPJob(HamaConfiguration conf) throws IOException {
+    super(conf);
+    rpc = YarnRPC.create(conf);
+    fs = FileSystem.get(conf);
+  }
+
+  public void setMemoryUsedPerTaskInMb(int mem) {
+    conf.setInt("bsp.child.mem.in.mb", mem);
+  }
+
+  public void kill() throws YarnRemoteException {
+    KillApplicationRequest killRequest = Records
+        .newRecord(KillApplicationRequest.class);
+    killRequest.setApplicationId(id);
+    applicationsManager.forceKillApplication(killRequest);
+  }
+
+  @Override
+  public void submit() throws IOException, InterruptedException {
+    LOG.info("Submitting job...");
+    if (conf.get("bsp.child.mem.in.mb") == null) {
+      LOG.warn("BSP Child memory has not been set, YARN will guess your needs or use default values.");
+    }
+
+    if (rpc == null) {
+      rpc = YarnRPC.create(getConf());
+    }
+
+    if (fs == null) {
+      fs = FileSystem.get(getConf());
+    }
+
+    if (conf.get("bsp.user.name") == null) {
+      String s = getUnixUserName();
+      conf.set("bsp.user.name", s);
+      LOG.info("Retrieved username: " + s);
+    }
+
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    Configuration appsManagerServerConf = new Configuration(conf);
+    // TODO what is that?
+    // appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
+    // ClientRMSecurityInfo.class, SecurityInfo.class);
+
+    applicationsManager = ((ClientRMProtocol) rpc.getProxy(
+        ClientRMProtocol.class, rmAddress, appsManagerServerConf));
+
+    GetNewApplicationRequest request = Records
+        .newRecord(GetNewApplicationRequest.class);
+    GetNewApplicationResponse response = applicationsManager
+        .getNewApplication(request);
+    id = response.getApplicationId();
+    LOG.info("Got new ApplicationId=" + id);
+
+    // Create a new ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the ApplicationId
+    appContext.setApplicationId(this.id);
+    // set the application name
+    appContext.setApplicationName(this.getJobName());
+
+    // Create a new container launch context for the AM's container
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+
+    // Define the local resources required
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    // Lets assume the jar we need for our ApplicationMaster is available in
+    // HDFS at a certain known path to us and we want to make it available to
+    // the ApplicationMaster in the launched container
+    Path jarPath = new Path(getWorkingDirectory(), id + "/app.jar");
+    fs.copyFromLocalFile(this.getLocalPath(this.getJar()), jarPath);
+    LOG.info("Copying app jar to " + jarPath);
+    conf.set("bsp.jar",
+        jarPath.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+    FileStatus jarStatus = fs.getFileStatus(jarPath);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+    amJarRsrc.setType(LocalResourceType.FILE);
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
+    amJarRsrc.setSize(jarStatus.getLen());
+    // this creates a symlink in the working directory
+    localResources.put("AppMaster.jar", amJarRsrc);
+    // Set the local resources into the launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set up the environment needed for the launch context
+    Map<String, String> env = new HashMap<String, String>();
+    // Assuming our classes or jars are available as local resources in the
+    // working directory from which the command will be run, we need to append
+    // "." to the path.
+    // By default, all the hadoop specific classpaths will already be available
+    // in $CLASSPATH, so we should be careful not to overwrite it.
+    String classPathEnv = "$CLASSPATH:./*:";
+    env.put("CLASSPATH", classPathEnv);
+    amContainer.setEnvironment(env);
+
+    // saving the conf file at this point to hdfs
+    // it should be in HDFS.
+    Path xmlPath = new Path(getWorkingDirectory(), id + "/job.xml");
+    FSDataOutputStream out = fs.create(xmlPath);
+    this.writeXml(out);
+    out.flush();
+    out.close();
+
+    // Construct the command to be executed on the launched container
+    String command = "${JAVA_HOME}"
+        + "/bin/java -cp "
+        + classPathEnv
+        + " "
+        + BSPApplicationMaster.class.getCanonicalName()
+        + " "
+        + xmlPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+            .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+        + "/stderr";
+
+    LOG.info("Start command: " + command);
+
+    amContainer.setCommands(Collections.singletonList(command));
+
+    Resource capability = Records.newRecord(Resource.class);
+    // we have at least 3 threads, which comsumes 1mb each, for each bsptask and
+    // a base usage of 100mb
+    capability.setMemory(3 * this.getNumBspTask() + 100);
+    LOG.info("Set memory for the application master to "
+        + capability.getMemory() + "mb!");
+    amContainer.setResource(capability);
+
+    // Set the container launch content into the ApplicationSubmissionContext
+    appContext.setAMContainerSpec(amContainer);
+
+    // Create the request to send to the ApplicationsManager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    applicationsManager.submitApplication(appRequest);
+
+    GetApplicationReportRequest reportRequest = Records
+        .newRecord(GetApplicationReportRequest.class);
+    reportRequest.setApplicationId(id);
+    while (report == null || report.getHost().equals("N/A")) {
+      GetApplicationReportResponse reportResponse = applicationsManager
+          .getApplicationReport(reportRequest);
+      report = reportResponse.getApplicationReport();
+      Thread.sleep(1000L);
+    }
+    LOG.info("Got report: " + report.getApplicationId() + " "
+        + report.getHost());
+    submitted = true;
+  }
+
+  @Override
+  public boolean waitForCompletion(boolean verbose) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    LOG.info("Starting job...");
+
+    if (!submitted) {
+      this.submit();
+    }
+
+    client = (BSPClient) RPC.waitForProxy(BSPClient.class, BSPClient.VERSION,
+        NetUtils.createSocketAddr(report.getHost(), report.getRpcPort()), conf);
+
+    GetApplicationReportRequest reportRequest = Records
+        .newRecord(GetApplicationReportRequest.class);
+    reportRequest.setApplicationId(id);
+
+    GetApplicationReportResponse reportResponse = applicationsManager
+        .getApplicationReport(reportRequest);
+    ApplicationReport localReport = reportResponse.getApplicationReport();
+    long clientSuperStep = 0L;
+    // TODO this may cause infinite loops, we can go with our rpc client
+    while (localReport.getFinalApplicationStatus() != null
+        && localReport.getFinalApplicationStatus() != FinalApplicationStatus.FAILED
+        && localReport.getFinalApplicationStatus() != FinalApplicationStatus.KILLED
+        && localReport.getFinalApplicationStatus() != FinalApplicationStatus.SUCCEEDED) {
+      LOG.info("currently in state: " + localReport.getFinalApplicationStatus());
+      if (verbose) {
+        long remoteSuperStep = client.getCurrentSuperStep().get();
+        if (clientSuperStep > remoteSuperStep) {
+          clientSuperStep = remoteSuperStep;
+          LOG.info("Current supersteps number: " + clientSuperStep);
+        }
+        reportResponse = applicationsManager
+            .getApplicationReport(reportRequest);
+        localReport = reportResponse.getApplicationReport();
+      }
+      Thread.sleep(3000L);
+    }
+
+    reportResponse = applicationsManager.getApplicationReport(reportRequest);
+    localReport = reportResponse.getApplicationReport();
+
+    if (localReport.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED) {
+      LOG.info("Job succeeded!");
+      return true;
+    } else {
+      LOG.info("Job failed with status: "
+          + localReport.getFinalApplicationStatus().toString() + "!");
+      return false;
+    }
+
+  }
+
+  /*
+   * THESE FOLLOWING METHODS WILL BE IMPLEMENTED IN BSPJOBCLIENT SOON.
+   */
+
+  static String getUnixUserName() throws IOException {
+    String[] result = executeShellCommand(new String[] { Shell.USER_NAME_COMMAND });
+    if (result.length != 1) {
+      throw new IOException("Expect one token as the result of "
+          + Shell.USER_NAME_COMMAND + ": " + toString(result));
+    }
+    return result[0];
+  }
+
+  private static String toString(String[] strArray) {
+    if (strArray == null || strArray.length == 0) {
+      return "";
+    }
+    StringBuilder buf = new StringBuilder(strArray[0]);
+    for (int i = 1; i < strArray.length; i++) {
+      buf.append(' ');
+      buf.append(strArray[i]);
+    }
+    return buf.toString();
+  }
+
+  private static String[] executeShellCommand(String[] command)
+      throws IOException {
+    String groups = Shell.execCommand(command);
+    StringTokenizer tokenizer = new StringTokenizer(groups);
+    int numOfTokens = tokenizer.countTokens();
+    String[] tokens = new String[numOfTokens];
+    for (int i = 0; tokenizer.hasMoreTokens(); i++) {
+      tokens[i] = tokenizer.nextToken();
+    }
+
+    return tokens;
+  }
+
+}

Propchange: incubator/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJob.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message