hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1599381 - in /hama/trunk: ./ bin/ conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ mesos/ mesos/src/ mesos/src/main/ mesos/src/main/java/ mesos/src/main/java/org/ mesos/src/main/java/org/apache/ mesos/src/m...
Date Mon, 02 Jun 2014 22:30:36 GMT
Author: edwardyoon
Date: Mon Jun  2 22:30:35 2014
New Revision: 1599381

URL: http://svn.apache.org/r1599381
Log:
HAMA-726: Hama on Mesos

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomStatusListener.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskWorkerManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskWorkerManager.java
    hama/trunk/mesos/
    hama/trunk/mesos/.gitignore
    hama/trunk/mesos/pom.xml
    hama/trunk/mesos/src/
    hama/trunk/mesos/src/main/
    hama/trunk/mesos/src/main/java/
    hama/trunk/mesos/src/main/java/org/
    hama/trunk/mesos/src/main/java/org/apache/
    hama/trunk/mesos/src/main/java/org/apache/hama/
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosExecutor.java
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosScheduler.java
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
    hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/bin/hama
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/trunk/pom.xml
    hama/trunk/src/assemble/bin.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Jun  2 22:30:35 2014
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)
    HAMA-863: Implement SparseVector (Yexi Jiang)
 
   BUG FIXES

Modified: hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/hama/trunk/bin/hama?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/bin/hama (original)
+++ hama/trunk/bin/hama Mon Jun  2 22:30:35 2014
@@ -133,6 +133,14 @@ if [ -d "$HAMA_HOME/ml/target/test-class
   CLASSPATH=${CLASSPATH}:$HAMA_HOME/ml/target/test-classes
 fi
 
+# add mesos classes to CLASSPATH
+if [ -d "$HAMA_HOME/mesos/target/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HAMA_HOME/mesos/target/classes
+fi
+if [ -d "$HAMA_HOME/mesos/target/test-classes/classes" ]; then
+  CLASSPATH=${CLASSPATH}:$HAMA_HOME/mesos/target/test-classes
+fi
+
 # so that filenames w/ spaces are handled correctly in loops below
 IFS=
 
@@ -202,14 +210,15 @@ if $cygwin; then
   JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"`
 fi
 
-HAMA_OPTS="$HAMA_OPTS -Dhama.log.dir=$HAMA_LOG_DIR"
-HAMA_OPTS="$HAMA_OPTS -Dhama.log.file=$HAMA_LOGFILE"
-HAMA_OPTS="$HAMA_OPTS -Dhama.home.dir=$HAMA_HOME"
-HAMA_OPTS="$HAMA_OPTS -Dhama.id.str=$HAMA_IDENT_STRING"
-HAMA_OPTS="$HAMA_OPTS -Dhama.root.logger=${HAMA_ROOT_LOGGER:-INFO,console}"
+HAMA_OPTS=""$HAMA_OPTS"-Dhama.log.dir=\"${HAMA_LOG_DIR}\""
+HAMA_OPTS=""$HAMA_OPTS" -Dhama.log.file=$HAMA_LOGFILE"
+HAMA_OPTS=""$HAMA_OPTS" -Dhama.home.dir=\"$HAMA_HOME\""
+HAMA_OPTS=""$HAMA_OPTS" -Dhama.id.str=$HAMA_IDENT_STRING"
+HAMA_OPTS=""$HAMA_OPTS" -Dhama.root.logger=${HAMA_ROOT_LOGGER:-INFO,console}"
 if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
-  HAMA_OPTS="$HAMA_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
-fi  
-HAMA_OPTS="$HAMA_OPTS -Dhama.policy.file=$HAMA_POLICYFILE"
+  HAMA_OPTS=""$HAMA_OPTS" -Djava.library.path=$JAVA_LIBRARY_PATH"
+fi
+HAMA_OPTS=""$HAMA_OPTS" -Dhama.policy.file=$HAMA_POLICYFILE"
+
 # run it
-exec "$JAVA" $JAVA_HEAP_MAX $HAMA_OPTS -classpath "$CLASSPATH" $CLASS "$@"
+exec "$JAVA" "$JAVA_HEAP_MAX" "$HAMA_OPTS" -classpath "$CLASSPATH" $CLASS "$@"

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Mon Jun  2 22:30:35 2014
@@ -110,6 +110,39 @@
     </description>
   </property>
   <property>
+    <name>bsp.master.TaskWorkerManager.class</name>
+    <value>org.apache.hama.bsp.SimpleTaskWorkerManager</value>
+    <description>The class responsible for assigning tasks to groom servers</description>
+  </property>
+  <property>
+    <name>hama.mesos.executor.uri</name>
+    <value>hdfs://hdfs.name.node:port/hama.tar.gz</value>
+    <description>
+      Ignore this if you are not using the Mesos TaskWorkerManger. 
+      This is the URI of the Hama distribution. Upload this yourself. 
+    </description>
+  </property>
+  <property>
+    <name>bsp.tasks.maximum.total</name>
+    <value></value>
+    <description>
+      This is an override for the total maximum tasks that may be run.
+      The default behavior is to determine a value based on the available groom
+      servers. However, if using mesos, the groom servers are not yet allocated.  
+      So, a value indicating the number of slots available in the cluster is needed. 
+    </description>
+  </property>
+  <property>
+    <name>hama.mesos.master</name>
+    <value>local</value>
+    <description>
+      Ignore this if you are not using the Mesos TaskWorkerManger.
+      This is the address of the Mesos master instance. If you're using
+      Zookeeper for master election, use the Zookeeper address here (i.e.,
+      zk://zk.apache.org:2181/hadoop/mesos).
+    </description>
+  </property>
+  <property>
     <name>bsp.local.tasks.maximum</name>
     <value>10</value>
     <description>Number of tasks that run in parallel when in local mode.</description>

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Jun  2 22:30:35 2014
@@ -61,6 +61,8 @@ public interface Constants {
 
   public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
 
+  public static final String MAX_TASKS = "bsp.tasks.maximum.total";
+
   public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
 
   public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
@@ -95,6 +97,11 @@ public interface Constants {
   public static final short DEFAULT_CHECKPOINT_INTERVAL = 1;
 
   // /////////////////////////////////////////////
+  // Executor related parameters.
+  // /////////////////////////////////////////////
+  public static final String TASK_EXECUTOR_CLASS = "bsp.master.TaskWorkerManager.class";
+  
+  // /////////////////////////////////////////////
   // Job configuration related parameters.
   // /////////////////////////////////////////////
   public static final String JOB_INPUT_DIR = "bsp.input.dir";

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Jun  2 22:30:35 2014
@@ -146,6 +146,8 @@ public class BSPMaster implements JobSub
 
   private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
 
+  private final List<GroomStatusListener> groomStatusListeners = new CopyOnWriteArrayList<GroomStatusListener>();
+
   private final AtomicReference<Supervisor> supervisor = new AtomicReference<Supervisor>();
 
   /**
@@ -186,6 +188,9 @@ public class BSPMaster implements JobSub
               jip.completedTask(tip, ts);
               // increment counters only if successful
               jip.getCounters().incrAllCounters(ts.getCounters());
+              for (GroomStatusListener listener : groomStatusListeners) {
+                  listener.taskComplete(groomStatus, tip);
+              }
             } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
               jip.getStatus().setProgress(ts.getSuperstepCount());
               jip.getStatus().setSuperstepCount(ts.getSuperstepCount());
@@ -400,11 +405,16 @@ public class BSPMaster implements JobSub
       LOG.error("Fail to register GroomServer " + status.getGroomName(), e);
       return false;
     }
+    
+    for (GroomStatusListener listener : groomStatusListeners) {
+        listener.groomServerRegistered(status);
+    }
+    
     LOG.info(status.getGroomName() + " is added.");
     return true;
   }
 
-  private static InetSocketAddress resolveWorkerAddress(String data) {
+  public static InetSocketAddress resolveWorkerAddress(String data) {
     return new InetSocketAddress(data.split(":")[0], Integer.parseInt(data
         .split(":")[1]));
   }
@@ -635,7 +645,7 @@ public class BSPMaster implements JobSub
     }
 
     int tasksPerGroom = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
-    this.totalTaskCapacity = tasksPerGroom * numGroomServers;
+    this.totalTaskCapacity = conf.getInt(Constants.MAX_TASKS, tasksPerGroom * numGroomServers);
 
     if (detailed) {
       return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity,
@@ -672,6 +682,17 @@ public class BSPMaster implements JobSub
   }
 
   @Override
+  public void addGroomStatusListener(GroomStatusListener listener) {
+    groomStatusListeners.add(listener);
+	
+  }
+
+  @Override
+  public void removeGroomStatusListener(GroomStatusListener listener) {
+    groomStatusListeners.remove(listener);
+  }
+  
+  @Override
   public void moveToBlackList(String host) {
     LOG.info("[moveToBlackList()]Host to be moved to black list: " + host);
     for (GroomServerStatus groomStatus : groomServerStatusKeySet()) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Jun  2 22:30:35 2014
@@ -388,19 +388,6 @@ public class GroomServer implements Runn
     this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
         HamaRPCProtocolVersion.versionID, bspMasterAddr, conf);
 
-    // enroll in bsp master
-    if (-1 == rpcPort || null == rpcAddr)
-      throw new IllegalArgumentException("Error rpc address " + rpcAddr
-          + " port" + rpcPort);
-    if (!this.masterClient.register(new GroomServerStatus(groomServerName,
-        cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks,
-        this.rpcServer, groomHostName))) {
-      LOG.error("There is a problem in establishing communication"
-          + " link with BSPMaster");
-      throw new IOException("There is a problem in establishing"
-          + " communication link with BSPMaster.");
-    }
-
     this.instructor = new Instructor();
     this.instructor.bind(DispatchTasksDirective.class,
         new DispatchTasksHandler());
@@ -427,6 +414,19 @@ public class GroomServer implements Runn
       this.sensor.get().start();
     }
 
+    // enroll in bsp master once the GroomServer is ready to accept tasks
+    if (-1 == rpcPort || null == rpcAddr)
+      throw new IllegalArgumentException("Error rpc address " + rpcAddr
+          + " port" + rpcPort);
+    if (!this.masterClient.register(new GroomServerStatus(groomServerName,
+        cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks,
+        this.rpcServer, groomHostName))) {
+      LOG.error("There is a problem in establishing communication"
+          + " link with BSPMaster");
+      throw new IOException("There is a problem in establishing"
+          + " communication link with BSPMaster.");
+    }
+    
     this.running = true;
     this.initialized = true;
   }
@@ -692,7 +692,19 @@ public class GroomServer implements Runn
 
         Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
             + task.getTaskID() + "/" + "job.jar");
-        dfs.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+        
+        Path jobFilePath = new Path(task.getJobFile());
+        
+        //wait a while for file to finish being written
+        for (int i = 0; i < 300 & !dfs.exists(jobFilePath); i++ ) {
+        	try {
+    			Thread.sleep(100);
+    		} catch (InterruptedException e) {
+    			LOG.warn("Sleep failed", e);
+    		}
+        }
+        
+        dfs.copyToLocalFile(jobFilePath, localJobFile);
 
         HamaConfiguration conf = new HamaConfiguration();
         conf.addResource(localJobFile);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java Mon Jun  2 22:30:35 2014
@@ -74,6 +74,21 @@ interface GroomServerManager {
   void removeJobInProgressListener(JobInProgressListener listener);
 
   /**
+   * Registers a GroomStatusListener to the GroomServerManager.
+   * 
+   * @param the JobInProgressListener listener to be added.
+   */
+  void addGroomStatusListener(GroomStatusListener listener);
+  
+  /**
+   * Unregisters a GroomStatusListener with the GroomServerManager
+   *
+   * @param the JobInProgressListener to be removed.
+   */
+  void removeGroomStatusListener(GroomStatusListener listener);
+  
+  
+  /**
    * Move a specific groom server to black list, marking that groom as dead.
    * 
    * @param host to be blocked.

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomStatusListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomStatusListener.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomStatusListener.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomStatusListener.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for updates to {@link GroomServerStatus} by groom servers. 
+ */
+public interface GroomStatusListener {
+
+  /**
+   * Invoked when a new groom server has been registered with the {@link BSPMaster}.
+   * 
+   * @param status The status of the new groom server
+   */
+  public abstract void groomServerRegistered(GroomServerStatus status);
+
+  
+  public abstract void taskComplete(GroomServerStatus status, TaskInProgress task);
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Jun  2 22:30:35 2014
@@ -328,6 +328,11 @@ public class JobInProgress {
     return this.taskToGroomMap.get(t);
   }
 
+  /**
+   * Gets the task for the first <code>TaskInProgress</code> that is not already running
+   * @param groomStatuses Map of statuses for available groom servers
+   * @return
+   */
   public synchronized Task obtainNewTask(
       Map<String, GroomServerStatus> groomStatuses) {
     this.clusterSize = groomStatuses.size();
@@ -342,28 +347,53 @@ public class JobInProgress {
 
     for (TaskInProgress task : tasks) {
       if (!task.isRunning() && !task.isComplete()) {
-
-        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
-            groomStatuses, taskCountInGroomMap, resources, task);
-        GroomServerStatus groomStatus = taskAllocationStrategy
-            .getGroomToAllocate(groomStatuses, selectedGrooms,
-                taskCountInGroomMap, resources, task);
-        if (groomStatus != null) {
-          result = task.constructTask(groomStatus);
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("Could not find a groom to schedule task");
-        }
-        if (result != null) {
-          updateGroomTaskDetails(task.getGroomServerStatus(), result);
-        }
+        result = obtainNewTask(task, groomStatuses, resources);
         break;
       }
     }
 
-    counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
     return result;
   }
 
+  /**
+   * Creates a new task based on the provided <code>TaskInProgress</code>
+   * @param task The <code>TaskInProgress</code> for which to create a task
+   * @param groomStatuses The statuses of available groom servers
+   * @param resources Available resources of the given groom server
+   * @return The <code>Task</code> to to execute
+   */
+  public synchronized Task obtainNewTask(TaskInProgress task,
+      Map<String, GroomServerStatus> groomStatuses, BSPResource[] resources) {
+    Task result = null;
+    String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
+        groomStatuses, taskCountInGroomMap, resources, task);
+    GroomServerStatus groomStatus = taskAllocationStrategy
+        .getGroomToAllocate(groomStatuses, selectedGrooms,
+            taskCountInGroomMap, resources, task);
+    if (groomStatus != null) {
+      result = task.constructTask(groomStatus);
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Could not find a groom to schedule task");
+    }
+    if (result != null) {
+      updateGroomTaskDetails(task.getGroomServerStatus(), result);
+      counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
+    }
+    return result;
+  }
+  
+  /**
+   * Get the ideal grooms on which to run a given task for the provided
+   * constraints
+   * @return
+   */
+  public String[] getPreferredGrooms(TaskInProgress task, 
+      Map<String, GroomServerStatus> groomStatuses, BSPResource[] resources) {
+    String[] grooms = taskAllocationStrategy.selectGrooms(
+	            groomStatuses, taskCountInGroomMap, resources, task);
+    return grooms;
+  }
+  
   public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
       Map<GroomServerStatus, List<GroomServerAction>> actionMap)
       throws IOException {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/Schedulable.java Mon Jun  2 22:30:35 2014
@@ -24,13 +24,12 @@ import java.io.IOException;
  */
 public interface Schedulable {
 
-  /**
-   * Schedule job to designated GroomServer(s) immediately.
-   * 
-   * @param job to be scheduled.
-   * @param statuses of GroomServer(s).
-   * @throws IOException
-   */
-  void schedule(JobInProgress job, GroomServerStatus... statuses)
-      throws IOException;
+	  /**
+	   * Schedule job immediately.
+	   * 
+	   * @param job to be scheduled.
+	   * @throws IOException
+	   */
+	  void schedule(JobInProgress job)
+	      throws IOException;
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Jun  2 22:30:35 2014
@@ -21,15 +21,7 @@ import static java.util.concurrent.TimeU
 import static org.apache.hama.monitor.fd.NodeStatus.Dead;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -42,9 +34,10 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
-import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.monitor.Federator;
 import org.apache.hama.monitor.Federator.Act;
 import org.apache.hama.monitor.Federator.CollectorHandler;
@@ -56,6 +49,7 @@ import org.apache.hama.monitor.fd.NodeEv
 import org.apache.hama.monitor.fd.NodeStatus;
 import org.apache.zookeeper.ZooKeeper;
 
+
 /**
  * A simple task scheduler with FCFS processing queue.
  */
@@ -71,6 +65,7 @@ class SimpleTaskScheduler extends TaskSc
   private AtomicBoolean initialized = new AtomicBoolean(false);
   private final JobListener jobListener;
   private final JobProcessor jobProcessor;
+  private TaskWorkerManager taskWorkerManager;
   private final AtomicReference<Federator> federator = new AtomicReference<Federator>();
   /** <String, MetricsRecord> maps to <groom server, metrics record> */
   private final ConcurrentMap<String, MetricsRecord> repository = new ConcurrentHashMap<String, MetricsRecord>();
@@ -159,9 +154,8 @@ class SimpleTaskScheduler extends TaskSc
         JobInProgress job = queue.removeJob();
         queueManager.get().addJob(PROCESSING_QUEUE, job);
         // schedule
-        Collection<GroomServerStatus> glist = groomServerManager.get()
-            .groomServerStatusKeySet();
-        schedule(job, glist.toArray(new GroomServerStatus[glist.size()]));
+
+        schedule(job);
       }
     }
 
@@ -172,13 +166,8 @@ class SimpleTaskScheduler extends TaskSc
      * @param Job to be scheduled.
      */
     @Override
-    public void schedule(JobInProgress job, GroomServerStatus... statuses) {
-      ClusterStatus clusterStatus = groomServerManager.get().getClusterStatus(
-          false);
-      final int numGroomServers = clusterStatus.getGroomServers();
-
-      Future<Boolean> jobScheduleResult = sched.submit(new TaskWorker(statuses,
-          numGroomServers, job));
+    public void schedule(JobInProgress job) {
+      Future<Boolean> jobScheduleResult = sched.submit(taskWorkerManager.spawnWorker(job));
 
       Boolean jobResult = Boolean.FALSE;
 
@@ -206,134 +195,8 @@ class SimpleTaskScheduler extends TaskSc
     }
   }
 
-  private class TaskWorker implements Callable<Boolean> {
-    private final Map<String, GroomServerStatus> groomStatuses;
-    private final int groomNum;
-    private final JobInProgress jip;
-
-    TaskWorker(final GroomServerStatus[] stus, final int num,
-        final JobInProgress jip) {
-      this.groomStatuses = new HashMap<String, GroomServerStatus>(2 * num);
-      for (GroomServerStatus status : stus) {
-        this.groomStatuses.put(status.hostName, status);
-      }
-      this.groomNum = num;
-      this.jip = jip;
-      if (null == this.groomStatuses)
-        throw new NullPointerException("Target groom server is not "
-            + "specified.");
-      if (-1 == this.groomNum)
-        throw new IllegalArgumentException("Groom number is not specified.");
-      if (null == this.jip)
-        throw new NullPointerException("No job is specified.");
-    }
-
-    private Boolean scheduleNewTasks() {
-
-      // Action to be sent for each task to the respective groom server.
-      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
-          2 * this.groomStatuses.size());
-      Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
-      Task t = null;
-      int cnt = 0;
-      while ((t = jip.obtainNewTask(this.groomStatuses)) != null) {
-        taskSet.add(t);
-        // Scheduled all tasks
-        if (++cnt == this.jip.tasks.length) {
-          break;
-        }
-      }
-
-      // if all tasks could not be scheduled
-      if (cnt != this.jip.tasks.length) {
-        LOG.error("Could not schedule all tasks!");
-        return Boolean.FALSE;
-      }
+  
 
-      // assembly into actions
-      for (Task task : taskSet) {
-        GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
-        List<GroomServerAction> taskActions = actionMap.get(groomStatus);
-        if (taskActions == null) {
-          taskActions = new ArrayList<GroomServerAction>(
-              groomStatus.getMaxTasks());
-        }
-        taskActions.add(new LaunchTaskAction(task));
-        actionMap.put(groomStatus, taskActions);
-      }
-
-      sendDirectivesToGrooms(actionMap);
-
-      return Boolean.TRUE;
-    }
-
-    /**
-     * Schedule recovery tasks.
-     * 
-     * @return TRUE object if scheduling is successful else returns FALSE
-     */
-    private Boolean scheduleRecoveryTasks() {
-
-      // Action to be sent for each task to the respective groom server.
-      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
-          2 * this.groomStatuses.size());
-
-      try {
-        jip.recoverTasks(groomStatuses, actionMap);
-      } catch (IOException e) {
-        return Boolean.FALSE;
-      }
-      return sendDirectivesToGrooms(actionMap);
-
-    }
-
-    private Boolean sendDirectivesToGrooms(
-        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
-      Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator();
-      while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
-          .getStatus().getRunState() == JobStatus.RECOVERING)
-          && groomIter.hasNext()) {
-
-        GroomServerStatus groomStatus = groomIter.next();
-        List<GroomServerAction> actionList = actionMap.get(groomStatus);
-
-        GroomProtocol worker = groomServerManager.get().findGroomServer(
-            groomStatus);
-        try {
-          // dispatch() to the groom server
-          GroomServerAction[] actions = new GroomServerAction[actionList.size()];
-          actionList.toArray(actions);
-          Directive d1 = new DispatchTasksDirective(actions);
-          worker.dispatch(d1);
-        } catch (IOException ioe) {
-          LOG.error(
-              "Fail to dispatch tasks to GroomServer "
-                  + groomStatus.getGroomName(), ioe);
-          return Boolean.FALSE;
-        }
-
-      }
-
-      if (groomIter.hasNext()
-          && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
-              .getStatus().getRunState() != JobStatus.RECOVERING)) {
-        LOG.warn("Currently master only shcedules job in running state. "
-            + "This may be refined in the future. JobId:" + jip.getJobID());
-        return Boolean.FALSE;
-      }
-
-      return Boolean.TRUE;
-    }
-
-    @Override
-    public Boolean call() {
-      if (jip.isRecoveryPending()) {
-        return scheduleRecoveryTasks();
-      } else {
-        return scheduleNewTasks();
-      }
-    }
-  }
 
   /**
    * Periodically collect metrics info.
@@ -402,6 +265,14 @@ class SimpleTaskScheduler extends TaskSc
     this.queueManager.get().createFCFSQueue(FINISHED_QUEUE);
     groomServerManager.get().addJobInProgressListener(this.jobListener);
 
+    // Create and initialize the task worker manager
+    Class<? extends TaskWorkerManager> taskWorkerClass = conf.getClass(
+    		Constants.TASK_EXECUTOR_CLASS, SimpleTaskWorkerManager.class,
+        TaskWorkerManager.class);
+    this.taskWorkerManager = ReflectionUtils.newInstance(taskWorkerClass, conf);
+    this.taskWorkerManager.init(groomServerManager, conf);
+
+    
     if (null != getConf()
         && getConf().getBoolean("bsp.federator.enabled", false)) {
       this.federator.get().start();

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskWorkerManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskWorkerManager.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskWorkerManager.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskWorkerManager.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * A simple taskWorkerManager that assumes groom servers are running
+ * and may be assigned jobs
+ */
+public class SimpleTaskWorkerManager extends TaskWorkerManager {
+  private static final Log LOG = LogFactory
+      .getLog(SimpleTaskWorkerManager.class);
+
+  @Override
+  public TaskWorker spawnWorker(JobInProgress jip) {
+    Collection<GroomServerStatus> statusCollection = groomServerManager.get()
+        .groomServerStatusKeySet();
+
+    ClusterStatus clusterStatus = groomServerManager.get()
+        .getClusterStatus(false);
+    final int numGroomServers = clusterStatus.getGroomServers();
+
+    return new SimpleTaskWorker(statusCollection, numGroomServers, jip);
+  }
+
+  private class SimpleTaskWorker implements TaskWorker {
+    private final Map<String, GroomServerStatus> groomStatuses;
+    private final int groomNum;
+    private final JobInProgress jip;
+
+    SimpleTaskWorker(final Collection<GroomServerStatus> stus,
+        final int num, final JobInProgress jip) {
+      this.groomStatuses = new HashMap<String, GroomServerStatus>(2 * num);
+      for (GroomServerStatus status : stus) {
+        this.groomStatuses.put(status.hostName, status);
+      }
+      this.groomNum = num;
+      this.jip = jip;
+      if (null == this.groomStatuses)
+        throw new NullPointerException("Target groom server is not "
+            + "specified.");
+      if (-1 == this.groomNum)
+        throw new IllegalArgumentException(
+            "Groom number is not specified.");
+      if (null == this.jip)
+        throw new NullPointerException("No job is specified.");
+    }
+
+    private Boolean scheduleNewTasks() {
+
+      // Action to be sent for each task to the respective groom server.
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+          2 * this.groomStatuses.size());
+      Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length);
+      Task t = null;
+      int cnt = 0;
+      while ((t = jip.obtainNewTask(this.groomStatuses)) != null) {
+        taskSet.add(t);
+        // Scheduled all tasks
+        if (++cnt == this.jip.tasks.length) {
+          break;
+        }
+      }
+
+      // if all tasks could not be scheduled
+      if (cnt != this.jip.tasks.length) {
+        LOG.error("Could not schedule all tasks!");
+        return Boolean.FALSE;
+      }
+
+      // assembly into actions
+      for (Task task : taskSet) {
+        GroomServerStatus groomStatus = jip.getGroomStatusForTask(task);
+        List<GroomServerAction> taskActions = actionMap
+            .get(groomStatus);
+        if (taskActions == null) {
+          taskActions = new ArrayList<GroomServerAction>(
+              groomStatus.getMaxTasks());
+        }
+        taskActions.add(new LaunchTaskAction(task));
+        actionMap.put(groomStatus, taskActions);
+      }
+
+      sendDirectivesToGrooms(actionMap);
+
+      return Boolean.TRUE;
+    }
+
+    /**
+     * Schedule recovery tasks.
+     * 
+     * @return TRUE object if scheduling is successful else returns FALSE
+     */
+    private Boolean scheduleRecoveryTasks() {
+
+      // Action to be sent for each task to the respective groom server.
+      Map<GroomServerStatus, List<GroomServerAction>> actionMap = new HashMap<GroomServerStatus, List<GroomServerAction>>(
+          2 * this.groomStatuses.size());
+
+      try {
+        jip.recoverTasks(groomStatuses, actionMap);
+      } catch (IOException e) {
+        return Boolean.FALSE;
+      }
+      return sendDirectivesToGrooms(actionMap);
+
+    }
+
+    private Boolean sendDirectivesToGrooms(
+        Map<GroomServerStatus, List<GroomServerAction>> actionMap) {
+      Iterator<GroomServerStatus> groomIter = actionMap.keySet()
+          .iterator();
+      while ((jip.getStatus().getRunState() == JobStatus.RUNNING || jip
+          .getStatus().getRunState() == JobStatus.RECOVERING)
+          && groomIter.hasNext()) {
+
+        GroomServerStatus groomStatus = groomIter.next();
+        List<GroomServerAction> actionList = actionMap.get(groomStatus);
+
+        GroomProtocol worker = groomServerManager.get()
+            .findGroomServer(groomStatus);
+        try {
+          // dispatch() to the groom server
+          GroomServerAction[] actions = new GroomServerAction[actionList
+                                                              .size()];
+          actionList.toArray(actions);
+          Directive d1 = new DispatchTasksDirective(actions);
+          worker.dispatch(d1);
+        } catch (IOException ioe) {
+          LOG.error("Fail to dispatch tasks to GroomServer "
+              + groomStatus.getGroomName(), ioe);
+          return Boolean.FALSE;
+        }
+
+      }
+
+      if (groomIter.hasNext()
+          && (jip.getStatus().getRunState() != JobStatus.RUNNING || jip
+          .getStatus().getRunState() != JobStatus.RECOVERING)) {
+        LOG.warn("Currently master only shcedules job in running state. "
+            + "This may be refined in the future. JobId:"
+            + jip.getJobID());
+        return Boolean.FALSE;
+      }
+
+      return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean call() {
+      if (jip.isRecoveryPending()) {
+        return scheduleRecoveryTasks();
+      } else {
+        return scheduleNewTasks();
+      }
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskWorkerManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskWorkerManager.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskWorkerManager.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskWorkerManager.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+
+/**
+ * manager responsible for spawning new worker units for each job.
+ */
+public abstract class TaskWorkerManager implements Configurable{
+  /**
+   * An execution unit that will delegate a job for execution
+   */
+  public interface TaskWorker extends Callable<Boolean>{
+  }
+
+  protected Configuration conf;
+  AtomicReference<GroomServerManager> groomServerManager = null;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  //TODO: Make this require conf for safty
+  public void init(AtomicReference<GroomServerManager> groomServerManager, 
+		  Configuration conf) {
+    this.groomServerManager = groomServerManager;
+    setConf(conf);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public abstract TaskWorker spawnWorker(JobInProgress jip);
+}

Added: hama/trunk/mesos/.gitignore
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/.gitignore?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/.gitignore (added)
+++ hama/trunk/mesos/.gitignore Mon Jun  2 22:30:35 2014
@@ -0,0 +1 @@
+/target

Added: hama/trunk/mesos/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/pom.xml?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/pom.xml (added)
+++ hama/trunk/mesos/pom.xml Mon Jun  2 22:30:35 2014
@@ -0,0 +1,108 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <parent>
+    <groupId>org.apache.hama</groupId>
+    <artifactId>hama-parent</artifactId>
+    <version>0.7.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hama</groupId>
+  <artifactId>hama-mesos</artifactId>
+  <name>mesos</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <!-- Dependencies -->
+    <mesos.version>0.15.0</mesos.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>1.5.3</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.mesos</groupId>
+      <artifactId>mesos</artifactId>
+      <version>${mesos.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <finalName>hama-mesos-${project.version}</finalName>
+      <plugins>
+        <plugin>
+          <artifactId>maven-surefire-plugin</artifactId>
+        </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>org.apache.mesos</groupId>
+                  <artifactId>mesos</artifactId>
+                  <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
+                </artifactItem>
+              </artifactItems>
+              <excludeTransitive>false</excludeTransitive>
+              <fileMode>755</fileMode>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      </plugins>
+
+  </build>
+
+</project>

Added: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosExecutor.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosExecutor.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosExecutor.java (added)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosExecutor.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.HamaConfiguration;
+import org.apache.mesos.Executor;
+import org.apache.mesos.ExecutorDriver;
+import org.apache.mesos.MesosExecutorDriver;
+import org.apache.mesos.Protos.Environment.Variable;
+import org.apache.mesos.Protos.*;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskStatus;
+
+import javax.xml.transform.TransformerException;
+
+import java.io.*;
+import java.util.Map;
+
+public class MesosExecutor implements Executor {
+  public static final Log LOG = LogFactory.getLog(MesosExecutor.class);
+  private GroomServer groomServer;
+
+  public static void main(String[] args) {
+    MesosExecutorDriver driver = new MesosExecutorDriver(new MesosExecutor());
+    System.exit(driver.run() == Status.DRIVER_STOPPED ? 0 : 1);
+  }
+
+  private HamaConfiguration configure(final TaskInfo task) {
+    Configuration conf = new Configuration(false);
+    try {
+      byte[] bytes = task.getData().toByteArray();
+      conf.readFields(new DataInputStream(new ByteArrayInputStream(bytes)));
+    } catch (IOException e) {
+      LOG.warn("Failed to deserialize configuraiton.", e);
+      System.exit(1);
+    }
+
+    // Set the local directories inside the executor sandbox, so that
+    // different Grooms on the same host do not step on each other.
+    conf.set("bsp.local.dir", System.getProperty("user.dir") + "/bsp/local");
+    conf.set("bsp.tmp.dir", System.getProperty("user.dir") + "/bsp/tmp");
+    conf.set("bsp.disk.queue.dir", System.getProperty("user.dir") + "/bsp/diskQueue");
+    conf.set("hama.disk.vertices.path", System.getProperty("user.dir") + "/bsp/graph");
+
+    return new HamaConfiguration(conf);
+  }
+
+  @Override
+  public void registered(ExecutorDriver driver, ExecutorInfo executorInfo,
+      FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
+    LOG.info("Executor registered with the slave");
+  }
+
+  @Override
+  public void launchTask(final ExecutorDriver driver, final TaskInfo task) {
+    LOG.info("Launching task : " + task.getTaskId().getValue());
+
+    // Get configuration from task data (prepared by the JobTracker).
+    HamaConfiguration conf = configure(task);
+
+    // NOTE: We need to manually set the context class loader here because,
+    // the TaskTracker is unable to find LoginModule class otherwise.
+   // Thread.currentThread().setContextClassLoader(
+    //    GroomServer.class.getClassLoader());
+
+    try {
+      groomServer = new GroomServer(conf);
+    } catch (IOException e) {
+      LOG.fatal("Failed to start GroomServer", e);
+      System.exit(1);
+    }
+
+    // Spin up a Groom Server in a new thread.
+    new Thread("GroomServer Run Thread") {
+      @Override
+      public void run() {
+        try {
+          groomServer.run();
+
+          // Send a TASK_FINISHED status update.
+          // We do this here because we want to send it in a separate thread
+          // than was used to call killTask().
+          driver.sendStatusUpdate(TaskStatus.newBuilder()
+              .setTaskId(task.getTaskId()).setState(TaskState.TASK_FINISHED)
+              .build());
+
+          // Give some time for the update to reach the slave.
+          try {
+            Thread.sleep(2000);
+          } catch (InterruptedException e) {
+            LOG.error("Failed to sleep TaskTracker thread", e);
+          }
+
+          // Stop the executor.
+          driver.stop();
+        } catch (Throwable t) {
+          LOG.error("Caught exception, committing suicide.", t);
+          driver.stop();
+          System.exit(1);
+        }
+      }
+    }.start();
+
+    driver.sendStatusUpdate(TaskStatus.newBuilder().setTaskId(task.getTaskId())
+        .setState(TaskState.TASK_RUNNING).build());
+  }
+
+  @Override
+  public void killTask(ExecutorDriver driver, TaskID taskId) {
+    LOG.info("Killing task : " + taskId.getValue());
+    try {
+      groomServer.shutdown();
+    } catch (IOException e) {
+      LOG.error("Failed to shutdown TaskTracker", e);
+    }
+  }
+
+  @Override
+  public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
+    LOG.info("Executor reregistered with the slave");
+  }
+
+  @Override
+  public void disconnected(ExecutorDriver driver) {
+    LOG.info("Executor disconnected from the slave");
+  }
+
+  @Override
+  public void frameworkMessage(ExecutorDriver d, byte[] msg) {
+    LOG.info("Executor received framework message of length: " + msg.length
+        + " bytes");
+  }
+
+  @Override
+  public void error(ExecutorDriver d, String message) {
+    LOG.error("MesosExecutor.error: " + message);
+  }
+
+  @Override
+  public void shutdown(ExecutorDriver d) {
+    LOG.info("Executor asked to shutdown");
+  }
+}

Added: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosScheduler.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosScheduler.java (added)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/MesosScheduler.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.*;
+import org.apache.mesos.MesosSchedulerDriver;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MesosScheduler extends TaskWorkerManager implements Scheduler {
+  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
+
+  private ResourceManager resourceManager;
+
+  SchedulerDriver driver;
+
+  public MesosScheduler() {
+  }
+
+  @Override
+  public void init(AtomicReference<GroomServerManager> groomServerManager,
+      Configuration conf) {
+    super.init(groomServerManager, conf);
+    start();
+    resourceManager = new ResourceManager(conf, groomServerManager, driver);
+  }
+
+  private void start() {
+    try {
+      FrameworkInfo frameworkInfo = FrameworkInfo.newBuilder()
+          .setUser("")
+          // Let Mesos fill in the user.
+          .setCheckpoint(conf.getBoolean("bsp.master.port", false))
+          .setRole(conf.get("hama.mesos.role", "*"))
+          .setName(
+              "Hama: (Master Port: " + conf.get("bsp.groom.rpc.port") + ","
+                  + " WebUI port: " + conf.get("bsp.http.groomserver.port")
+                  + ")").build();
+
+      String master = conf.get("hama.mesos.master", "local");
+      driver = new MesosSchedulerDriver(this, frameworkInfo, master);
+      driver.start();
+    } catch (Exception e) {
+      // If the MesosScheduler can't be loaded, the JobTracker won't be useful
+      // at all, so crash it now so that the user notices.
+      LOG.fatal("Failed to start MesosScheduler", e);
+      System.exit(1);
+    }
+  }
+
+  @Override
+  public synchronized void disconnected(SchedulerDriver schedulerDriver) {
+    LOG.warn("Disconnected from Mesos master.");
+  }
+
+  @Override
+  public synchronized void error(SchedulerDriver schedulerDriver, String s) {
+    LOG.error("Error from scheduler driver: " + s);
+  }
+
+  @Override
+  public synchronized void executorLost(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, int status) {
+    LOG.warn("Executor " + executorID.getValue() + " lost with status "
+        + status + " on slave " + slaveID);
+  }
+
+  @Override
+  public synchronized void frameworkMessage(SchedulerDriver schedulerDriver,
+      ExecutorID executorID, SlaveID slaveID, byte[] bytes) {
+    LOG.info("Framework Message of " + bytes.length + " bytes"
+        + " from executor " + executorID.getValue() + " on slave "
+        + slaveID.getValue());
+  }
+
+  @Override
+  public synchronized void offerRescinded(SchedulerDriver schedulerDriver,
+      OfferID offerID) {
+    LOG.warn("Rescinded offer: " + offerID.getValue());
+  }
+
+  @Override
+  public void registered(SchedulerDriver driver, FrameworkID frameID,
+      MasterInfo masterInfo) {
+    LOG.info("Registered as " + frameID.getValue() + " with master "
+        + masterInfo);
+  }
+
+  @Override
+  public void reregistered(SchedulerDriver arg0, MasterInfo masterInfo) {
+    LOG.info("Re-registered with master " + masterInfo);
+  }
+
+  @Override
+  public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers) {
+    resourceManager.resourceOffers(schedulerDriver, offers);
+  }
+
+  @Override
+  public synchronized void slaveLost(SchedulerDriver schedulerDriver,
+      SlaveID slaveID) {
+    LOG.warn("Slave lost: " + slaveID.getValue());
+  }
+
+  @Override
+  public synchronized void statusUpdate(SchedulerDriver schedulerDriver,
+      Protos.TaskStatus taskStatus) {
+    LOG.info("Status update of " + taskStatus.getTaskId().getValue() + " to "
+        + taskStatus.getState().name() + " with message "
+        + taskStatus.getMessage());
+
+    switch (taskStatus.getState()) {
+    case TASK_FINISHED:
+    case TASK_FAILED:
+    case TASK_KILLED:
+    case TASK_LOST:
+    case TASK_STAGING:
+    case TASK_STARTING:
+    case TASK_RUNNING:
+      break;
+    default:
+      LOG.error("Unexpected TaskStatus: " + taskStatus.getState().name());
+      break;
+    }
+  }
+
+  @Override
+  public TaskWorker spawnWorker(JobInProgress jip) {
+    LOG.info("Spawning worker for: " + jip);
+    return resourceManager.new MesosTaskWorker(jip);
+  }
+
+}

Added: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java (added)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.TaskWorkerManager.TaskWorker;
+import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.Protos.CommandInfo;
+import org.apache.mesos.Protos.ExecutorID;
+import org.apache.mesos.Protos.ExecutorInfo;
+import org.apache.mesos.Protos.Offer;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.TaskID;
+import org.apache.mesos.Protos.TaskInfo;
+import org.apache.mesos.Protos.Value;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.protobuf.ByteString;
+
+public class ResourceManager {
+  public static final Log log = LogFactory.getLog(ResourceManager.class);
+
+  private final String anyGroomServer = "_any_";
+
+  private Configuration conf;
+  private static long launchedTasks = 0;
+
+  private Set<JobInProgress> executing = new HashSet<JobInProgress>();
+  private Set<TaskInProgress> executingTasks = new HashSet<TaskInProgress>();
+  private Map<String, java.util.Queue<TaskInProgress>> tasksToRunByGroom;
+  private Set<TaskInProgress> tasksToRun;
+
+  private long slotMemory;
+  // Overhead requirements for the container groom server
+  double groomCpus;
+  double groomMem;
+  double groomDisk;
+
+  TaskDelegator taskDelegator;
+
+  /**
+   * Constructor for the mesos resource manager
+   * 
+   * @param conf
+   *          The configuration options for hama
+   * @param serverManager
+   *          A reference to the groom server manager
+   * @param driver
+   *          The mesos driver. This is required to terminate tasks
+   */
+  public ResourceManager(Configuration conf,
+      AtomicReference<GroomServerManager> serverManager, SchedulerDriver driver) {
+    tasksToRunByGroom = new HashMap<String, java.util.Queue<TaskInProgress>>();
+    tasksToRunByGroom.put(anyGroomServer, new LinkedList<TaskInProgress>());
+    tasksToRun = new HashSet<TaskInProgress>();
+
+    slotMemory = parseMemory(conf);
+
+    taskDelegator = new TaskDelegator(serverManager, driver, executingTasks);
+    serverManager.get().addGroomStatusListener(taskDelegator);
+    this.conf = conf;
+    
+    groomCpus = conf.getInt("hama.mesos.groom.cpu", 0);
+    groomMem = conf.getInt("hama.mesos.groom.mem", 200);;
+    groomDisk = conf.getInt("hama.mesos.groom.disk", 0);;
+  }
+
+  /**
+   * Handle a resource offer by the mesos framework
+   * 
+   * @param schedulerDriver
+   *          The mesos scheduler driver
+   * @param offers
+   *          A list of offers from mesos
+   */
+  public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer> offers) {
+
+    if (tasksToRun.isEmpty()) {
+      //there is no need to track executing tasks if everything is started
+      clearQueues();
+      
+      for (Offer offer : offers) {
+        schedulerDriver.declineOffer(offer.getId());
+      }
+    } else {
+      for (Offer offer : offers) {
+        useOffer(schedulerDriver, offer);
+      }
+    }
+  }
+
+  private void clearQueues() {
+	synchronized (tasksToRunByGroom) {
+	  for ( java.util.Queue<TaskInProgress> queue : tasksToRunByGroom.values()) {
+		  queue.clear();
+	  }
+	  executingTasks.clear();
+	}
+  }
+  
+  private void useOffer(SchedulerDriver schedulerDriver, Offer offer) {
+    log.debug("Received offer From: " + offer.getHostname());
+
+    String host = offer.getHostname();
+
+    ResourceOffer ro = new ResourceOffer(offer);
+    int maxSlots = ro.getMaxSlots();
+    if (maxSlots == 0) {
+      schedulerDriver.declineOffer(offer.getId());
+      return;
+    }
+
+    java.util.Queue<TaskInProgress> tasks = new LinkedList<TaskInProgress>();
+
+    while (tasks.size() < maxSlots) {
+      TaskInProgress tip = null;
+      if (tasksToRunByGroom.get(host) != null) {
+        tip = tasksToRunByGroom.get(host).poll();
+      }
+
+      if (tip == null) {
+        tip = tasksToRunByGroom.get(anyGroomServer).poll();
+        if (tip == null) {
+          if (tasks.isEmpty()) {
+            schedulerDriver.declineOffer(offer.getId());
+          }
+
+          break;
+        }
+      }
+      if (executingTasks.contains(tip)) {
+        continue;
+      }
+
+      executingTasks.add(tip);
+      tasksToRun.remove(tip);
+
+      tasks.add(tip);
+
+      log.debug("Found offer for: " + tip.getTaskId());
+    }
+
+    if (!tasks.isEmpty()) {
+      launchTasks(schedulerDriver, tasks, ro);
+    }
+  }
+
+  class MesosTaskWorker implements TaskWorker {
+    private final JobInProgress jip;
+
+    public MesosTaskWorker(JobInProgress jip) {
+      this.jip = jip;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      log.debug("Task Worker called: " + jip.tasks.length);
+      if (!jip.isRecoveryPending()) {
+        for (TaskInProgress tip : jip.tasks) {
+          String[] grooms = jip.getPreferredGrooms(tip, null, null);
+
+          if (grooms == null) {
+            grooms = new String[] { anyGroomServer };
+          }
+          log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
+              + grooms[0]);
+      	  synchronized (tasksToRunByGroom) {
+            for (String groom : grooms) {
+              if (!tasksToRunByGroom.containsKey(groom)) {
+                tasksToRunByGroom.put(groom, new LinkedList<TaskInProgress>());
+                log.info("Received request for groom: " + groom);
+              }
+              tasksToRunByGroom.get(groom).add(tip);
+            }
+            tasksToRun.add(tip);
+      	  }
+        }
+      } else {
+    	  throw new UnsupportedOperationException("This feature is not yet implemented");
+        //TODO: Handle task recovery
+      }
+      executing.add(jip);
+      return true;
+    }
+  }
+
+  private void launchTasks(SchedulerDriver schedulerDriver,
+      java.util.Queue<TaskInProgress> tips, ResourceOffer offer) {
+    TaskID taskId = TaskID.newBuilder().setValue("Task " + launchedTasks++)
+        .build();
+
+    List<Long> ports = claimPorts(offer.ports, 2);
+
+    double taskCpus = 1 * tips.size() + groomCpus;
+    double taskMem = slotMemory * tips.size() + groomMem;
+    double taskDisk = 10 + groomDisk;
+
+    String uri = conf.get("hama.mesos.executor.uri");
+    if (uri == null) {
+      throw new RuntimeException(
+          "Expecting configuration property 'mapred.mesos.executor.uri'");
+    }
+
+    String directory = conf.get("hama.mesos.executor.directory");
+    if (directory == null || directory.equals("")) {
+      log.info("URI: " + uri + ", name: " + new File(uri).getName());
+
+      directory = new File(uri).getName().split("\\.")[0] + "*";
+    }
+    log.debug("Directory: " + directory);
+    String command = conf.get("hama.mesos.executor.command");
+    if (command == null || command.equals("")) {
+      command = "env ; bash -x ./bin/hama org.apache.hama.bsp.MesosExecutor";
+    }
+
+    // Set up the environment for running the TaskTracker.
+    Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
+
+    // Set java specific environment, appropriately.
+    Map<String, String> env = System.getenv();
+    if (env.containsKey("JAVA_HOME")) {
+      envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+          .setName("JAVA_HOME").setValue(env.get("JAVA_HOME")));
+    }
+
+    envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+        .setName("HAMA_LOG_DIR").setValue("logs"));
+
+    log.debug("JAVA_HOME: " + env.get("JAVA_HOME"));
+    if (env.containsKey("JAVA_LIBRARY_PATH")) {
+      envBuilder.addVariables(Protos.Environment.Variable.newBuilder()
+          .setName("JAVA_LIBRARY_PATH").setValue(env.get("JAVA_LIBRARY_PATH")));
+    }
+    log.debug("JAVA_LIBRARY_PATH: " + env.get("JAVA_LIBRARY_PATH"));
+
+    CommandInfo commandInfo = CommandInfo.newBuilder()
+        .setEnvironment(envBuilder)
+        .setValue(String.format("cd %s && %s", directory, command))
+        .addUris(CommandInfo.URI.newBuilder().setValue(uri)).build();
+
+    log.debug("Offer: cpus:  " + offer.cpus + " mem: " + offer.mem + "disk: "
+        + offer.disk);
+    log.debug("Cpu: " + taskCpus + " Mem: " + taskMem + " Disk: " + taskDisk
+        + " port: " + ports.get(0));
+    TaskInfo info = TaskInfo
+        .newBuilder()
+        .setName(taskId.getValue())
+        .setTaskId(taskId)
+        .setSlaveId(offer.offer.getSlaveId())
+        .addResources(
+            Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR)
+                .setRole(offer.cpuRole)
+                .setScalar(Value.Scalar.newBuilder().setValue(taskCpus)))
+        .addResources(
+            Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR)
+                .setRole(offer.memRole)
+                .setScalar(Value.Scalar.newBuilder().setValue(taskMem)))
+        .addResources(
+            Resource.newBuilder().setName("disk").setType(Value.Type.SCALAR)
+                .setRole(offer.diskRole)
+                .setScalar(Value.Scalar.newBuilder().setValue(taskDisk)))
+        .addResources(
+            Resource
+                .newBuilder()
+                .setName("ports")
+                .setType(Value.Type.RANGES)
+                .setRole(offer.portRole)
+                .setRanges(
+                    Value.Ranges
+                        .newBuilder()
+                        .addRange(
+                            Value.Range.newBuilder().setBegin(ports.get(0))
+                                .setEnd(ports.get(0)))
+                        .addRange(
+                            Value.Range.newBuilder().setBegin(ports.get(1))
+                                .setEnd(ports.get(1)))))
+        .setExecutor(
+            ExecutorInfo
+                .newBuilder()
+                .setExecutorId(
+                    ExecutorID.newBuilder().setValue(
+                        "executor_" + taskId.getValue()))
+                .setName("Hama Groom Server").setSource(taskId.getValue())
+                .setCommand(commandInfo))
+        .setData(
+            ByteString.copyFrom(getConfigurationOverride(ports.get(0),
+                ports.get(1), tips.size(), (long) taskMem))).build();
+
+    log.debug("Accepting offer " + offer.offer.getId() + " cpus: " + taskCpus
+        + " mem: " + taskMem);
+    for (TaskInProgress tip : tips) {
+      taskDelegator.addTask(tip, taskId, offer.offer.getHostname(), ports
+          .get(0).intValue());
+    }
+    schedulerDriver.launchTasks(offer.offer.getId(), Arrays.asList(info));
+  }
+
+  private List<Long> claimPorts(List<Value.Range> offeredPorts, int count) {
+    List<Long> ports = new ArrayList<Long>(count);
+
+    for (Value.Range range : offeredPorts) {
+      long begin = range.getBegin();
+
+      while (ports.size() < count & range.getEnd() != begin) {
+        ports.add(begin++);
+      }
+
+      if (ports.size() == count) {
+        break;
+      }
+    }
+
+    return ports;
+  }
+
+  private byte[] getConfigurationOverride(Long groomRPCPort,
+      Long groomPeerPort, Integer maxTasks, Long slotJVMHeap) {
+    // Create a configuration from the current configuration and
+    // override properties as appropriate for the Groom server.
+    Configuration overrides = new Configuration(conf);
+
+    overrides.set("bsp.groom.rpc.port", groomRPCPort.toString());
+    overrides.set("bsp.peer.port", groomPeerPort.toString());
+    overrides.set("bsp.tasks.maximum", maxTasks.toString());
+
+    overrides.set("bsp.child.java.opts", conf.get("bsp.child.java.opts")
+        + slotJVMHeap + "m");
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      overrides.write(new DataOutputStream(baos));
+      baos.flush();
+    } catch (IOException e) {
+      log.warn("Failed to serialize configuration.", e);
+      System.exit(1);
+    }
+
+    return baos.toByteArray();
+  }
+
+  private class ResourceOffer {
+    Offer offer;
+
+    double cpus;
+    double mem;
+    double disk;
+    List<Value.Range> ports;
+
+    String cpuRole;
+    String memRole;
+    String diskRole;
+    String portRole;
+
+    public ResourceOffer(Offer offer) {
+      this.offer = offer;
+      parseOffer(offer);
+    }
+
+    private void parseOffer(Offer offer) {
+      // Pull out the cpus, memory, disk, and 2 ports from the offer.
+      for (Resource resource : offer.getResourcesList()) {
+        if (resource.getName().equals("cpus")
+            && resource.getType() == Value.Type.SCALAR) {
+          cpus = resource.getScalar().getValue();
+          cpuRole = resource.getRole();
+        } else if (resource.getName().equals("mem")
+            && resource.getType() == Value.Type.SCALAR) {
+          mem = resource.getScalar().getValue();
+          memRole = resource.getRole();
+        } else if (resource.getName().equals("disk")
+            && resource.getType() == Value.Type.SCALAR) {
+          disk = resource.getScalar().getValue();
+          diskRole = resource.getRole();
+        } else if (resource.getName().equals("ports")
+            && resource.getType() == Value.Type.RANGES) {
+          portRole = resource.getRole();
+          ports = resource.getRanges().getRangeList();
+        }
+      }
+    }
+
+    private int getMaxSlots() {
+      return (int) Math.min(cpus, mem / slotMemory);
+    }
+  }
+
+  /**
+   * Get the amount of memory requested in MiB
+   * 
+   * @param javaOpts
+   *          java options
+   * @return mesos formated memory argument
+   */
+  private static long parseMemory(Configuration conf) {
+    String javaOpts = conf.get("bsp.child.java.opts", "-Xmx200m");
+    Matcher memMatcher = Pattern.compile("^*-Xmx+([0-9]+)([k,m,g]).*").matcher(
+        javaOpts);
+    if (memMatcher.matches()) {
+      long value = Long.parseLong(memMatcher.group(1));
+      String unit = memMatcher.group(2);
+
+      if (unit.equals("k")) {
+        value = (long) Math.ceil((float) value / 1024);
+      } else if (unit.equals("g")) {
+        value = value * 1024;
+      }
+
+      // remove memory request from the child java opts so it may be added later
+      conf.set("bsp.child.java.opts", memMatcher.replaceAll(""));
+
+      return value;
+    } else {
+      // default to 200 MiB
+      return 200;
+    }
+  }
+}

Added: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
URL: http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java?rev=1599381&view=auto
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java (added)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java Mon Jun  2 22:30:35 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hama.ipc.GroomProtocol;
+import org.apache.mesos.Protos;
+import org.apache.mesos.SchedulerDriver;
+
+public class TaskDelegator implements GroomStatusListener {
+  public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
+
+  private Set<TaskInProgress> executingTasks;
+
+  
+  /**
+   * Map to hold assignments from groomServerNames to TasksInProgress
+   */
+  private MultiValueMap assignments = new MultiValueMap();
+
+  private AtomicReference<GroomServerManager> groomServerManager;
+
+  /**
+   * Map from Pair of groomServerName and port number to the latest
+   * GroomServerStatus
+   */
+  private Map<Pair<String, Integer>, GroomServerStatus> groomServers = new HashMap<Pair<String, Integer>, GroomServerStatus>();
+
+  private Map<Pair<String, Integer>, Protos.TaskID> groomTaskIDs = new HashMap<Pair<String, Integer>, Protos.TaskID>();
+
+  private SchedulerDriver driver;
+
+  public TaskDelegator(AtomicReference<GroomServerManager> groomServerManager,
+      SchedulerDriver driver, Set<TaskInProgress> executingTasks) {
+    this.groomServerManager = groomServerManager;
+    this.driver = driver;
+    this.executingTasks = executingTasks;
+  }
+
+  @Override
+  public void groomServerRegistered(GroomServerStatus status) {
+    Pair<String, Integer> key = new Pair<String, Integer>(
+        status.getGroomHostName(), BSPMaster.resolveWorkerAddress(
+            status.rpcServer).getPort());
+    LOG.debug("Received Groom From: " + key.getKey() + ":" + key.getValue());
+
+    if (assignments.containsKey(key)) {
+      for (Object tip : assignments.getCollection(key)) {
+        execute((TaskInProgress) tip, status);
+      }
+    } else {
+      LOG.error("Unexpected host found: " + key.getKey() + ":" + key.getValue());
+    }
+    groomServers.put(key, status);
+  }
+
+  /**
+   * Add a task for execution when the groom server becomes available
+   * 
+   * @param tip
+   *          The TaskInProgress to execute
+   * @param hostName
+   *          The hostname where the resource reservation was made
+   */
+  public void addTask(TaskInProgress tip, Protos.TaskID taskId,
+      String hostName, Integer port) {
+    LOG.trace("Adding Host: " + hostName + ":" + port + " for Task:"
+        + tip.getTaskId());
+    Pair<String, Integer> key = new Pair<String, Integer>(hostName, port);
+
+    groomTaskIDs.put(key, taskId);
+
+    if (groomServers.containsKey(key)) {
+      execute(tip, groomServers.get(key));
+    } else {
+      assignments.put(key, tip);
+    }
+  }
+
+  private void execute(TaskInProgress tip, GroomServerStatus status) {
+    Task task = tip.constructTask(status);
+
+    GroomProtocol worker = groomServerManager.get().findGroomServer(status);
+
+    GroomServerAction[] actions = new GroomServerAction[1];
+    actions[0] = new LaunchTaskAction(task);
+    Directive d1 = new DispatchTasksDirective(actions);
+    try {
+      worker.dispatch(d1);
+    } catch (IOException ioe) {
+      LOG.error(
+          "Fail to dispatch tasks to GroomServer " + status.getGroomName(), ioe);
+    }
+  }
+
+  @Override
+  public void taskComplete(GroomServerStatus status, TaskInProgress task) {
+    Pair<String, Integer> key = new Pair<String, Integer>(
+        status.getGroomHostName(), BSPMaster.resolveWorkerAddress(
+            status.rpcServer).getPort());
+    groomServers.put(key, status);
+    assignments.remove(key, task);
+    
+    if (assignments.getCollection(key) == null) {
+      groomServers.remove(key);
+      driver.killTask(groomTaskIDs.get(key));
+    }
+  }
+}

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Jun  2 22:30:35 2014
@@ -312,6 +312,7 @@
     <module>graph</module>
     <module>examples</module>
     <module>ml</module>
+    <module>mesos</module>
     <module>dist</module>
   </modules>
 

Modified: hama/trunk/src/assemble/bin.xml
URL: http://svn.apache.org/viewvc/hama/trunk/src/assemble/bin.xml?rev=1599381&r1=1599380&r2=1599381&view=diff
==============================================================================
--- hama/trunk/src/assemble/bin.xml (original)
+++ hama/trunk/src/assemble/bin.xml Mon Jun  2 22:30:35 2014
@@ -86,6 +86,18 @@
       </excludes>
       <outputDirectory>../hama-${project.version}/</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>../mesos/target</directory>
+      <includes>
+        <include>hama-*.jar</include>
+      </includes>
+      <excludes>
+        <exclude>*sources.jar</exclude>
+        <exclude>*tests.jar</exclude>
+        <exclude>*javadoc.jar</exclude>
+      </excludes> 
+      <outputDirectory>../hama-${project.version}/</outputDirectory>
+    </fileSet>
  
     <fileSet>
       <directory>../</directory>



Mime
View raw message