incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r948024 [2/3] - in /incubator/hama/trunk: bin/ conf/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/graph/ src/java/org/apache/hama/ipc/ src/java/org/apache/hama/util/ src/test/org/apache/hama/ src/test/org...
Date Tue, 25 May 2010 12:37:49 GMT
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerAction.java Tue May 25 12:37:48 2010
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hama.bsp.BSPMaster}
+ * to the {@link org.apache.hama.bsp.GroomServer} to take some 'action'. 
+ * 
+ */
+abstract class GroomServerAction implements Writable {
+  
+  /**
+   * Ennumeration of various 'actions' that the {@link BSPMaster}
+   * directs the {@link GroomServer} to perform periodically.
+   * 
+   */
+  public static enum ActionType {
+    /** Launch a new task. */
+    LAUNCH_TASK,
+    
+    /** Kill a task. */
+    KILL_TASK,
+    
+    /** Kill any tasks of this job and cleanup. */
+    KILL_JOB,
+    
+    /** Reinitialize the groomserver. */
+    REINIT_GROOM,
+
+    /** Ask a task to save its output. */
+    COMMIT_TASK
+  };
+  
+  /**
+   * A factory-method to create objects of given {@link ActionType}. 
+   * @param actionType the {@link ActionType} of object to create.
+   * @return an object of {@link ActionType}.
+   */
+  public static GroomServerAction createAction(ActionType actionType) {
+    GroomServerAction action = null;
+    
+    switch (actionType) {
+    case LAUNCH_TASK:
+      {
+        action = new LaunchTaskAction();
+      }
+      break;
+    case KILL_TASK:
+      {
+        action = new KillTaskAction();
+      }
+      break;
+    case KILL_JOB:
+      {
+        action = new KillJobAction();
+      }
+      break;
+    case REINIT_GROOM:
+      {
+        action = new ReinitTrackerAction();
+      }
+      break;
+    case COMMIT_TASK:
+      {
+        action = new CommitTaskAction();
+      }
+      break;
+    }
+
+    return action;
+  }
+  
+  private ActionType actionType;
+  
+  protected GroomServerAction(ActionType actionType) {
+    this.actionType = actionType;
+  }
+  
+  /**
+   * Return the {@link ActionType}.
+   * @return the {@link ActionType}.
+   */
+  ActionType getActionId() {
+    return actionType;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeEnum(out, actionType);
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    actionType = WritableUtils.readEnum(in, ActionType.class);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerManager.java Tue May 25 12:37:48 2010
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Manages information about the {@link GroomServer}s running on a cluster.
+ * This interface exits primarily to test the {@link BSPMaster}, and is not
+ * intended to be implemented by users.
+ */
+interface GroomServerManager {
+
+  /**
+   * @return A collection of the {@link GroomServerStatus} for the grooms
+   * being managed.
+   */
+  public Collection<GroomServerStatus> grooms();
+  
+  /**
+   * @return The number of unique hosts running grooms.
+   */
+  public int getNumberOfUniqueHosts();
+  
+  /**
+   * Get the current status of the cluster
+   * @param detailed if true then report groom names as well
+   * @return summary of the state of the cluster
+   */
+  public ClusterStatus getClusterStatus(boolean detailed);
+
+  /**
+   * Registers a {@link JobInProgressListener} for updates from this
+   * {@link GroomServerManager}.
+   * @param jobInProgressListener the {@link JobInProgressListener} to add
+   */
+  public void addJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Unregisters a {@link JobInProgressListener} from this
+   * {@link GroomServerManager}.
+   * @param jobInProgressListener the {@link JobInProgressListener} to remove
+   */
+  public void removeJobInProgressListener(JobInProgressListener listener);
+
+  /**
+   * Return the current heartbeat interval that's used by {@link GroomServer}s.
+   *
+   * @return the heartbeat interval used by {@link GroomServer}s
+   */
+  public int getNextHeartbeatInterval();
+
+  /**
+   * Kill the job identified by jobid
+   * 
+   * @param jobid
+   * @throws IOException
+   */
+  public void killJob(BSPJobID jobid)
+      throws IOException;
+
+  /**
+   * Obtain the job object identified by jobid
+   * 
+   * @param jobid
+   * @return jobInProgress object
+   */
+  public JobInProgress getJob(BSPJobID jobid);
+  
+  /**
+   * Initialize the Job
+   * 
+   * @param job JobInProgress object
+   */
+  public void initJob(JobInProgress job);
+  
+  /**
+   * Fail a job.
+   * 
+   * @param job JobInProgress object
+   */
+  public void failJob(JobInProgress job);
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServerStatus.java Tue May 25 12:37:48 2010
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ *
+ */
+public class GroomServerStatus implements Writable {
+
+  static {
+    WritableFactories.setFactory
+      (GroomServerStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new GroomServerStatus(); }
+       });
+  }
+  
+  String groomName;
+  String host;
+  int failures;
+  List<TaskStatus> taskReports;
+  
+  volatile long lastSeen;
+  private int maxTasks;
+
+  public GroomServerStatus() {
+    taskReports = new ArrayList<TaskStatus>();
+  }
+  
+  public GroomServerStatus(String groomName, String host, 
+      List<TaskStatus> taskReports, int failures, int maxTasks) {
+    this.groomName = groomName;
+    this.host = host;
+    
+    this.taskReports = new ArrayList<TaskStatus>(taskReports);
+    this.failures = failures;
+    this.maxTasks = maxTasks;
+  }
+  
+  public String getGroomName() {
+    return groomName;
+  }
+  
+  public String getHost() {
+    return host;
+  }
+  
+  /**
+   * Get the current tasks at the GroomServer.
+   * Tasks are tracked by a {@link TaskStatus} object.
+   * 
+   * @return a list of {@link TaskStatus} representing 
+   *         the current tasks at the GroomServer.
+   */
+  public List<TaskStatus> getTaskReports() {
+    return taskReports;
+  }
+  
+  public int getFailures() {
+    return failures;
+  }  
+  
+  public long getLastSeen() {
+    return lastSeen;
+  }
+  
+  public void setLastSeen(long lastSeen) {
+    this.lastSeen = lastSeen;
+  }
+
+  public int getMaxTasks() {
+    return maxTasks;
+  }
+  
+  /**
+   * Return the current MapTask count
+   */
+  public int countTasks() {
+    int taskCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = it.next();
+      TaskStatus.State state = ts.getRunState();
+      if(state == TaskStatus.State.RUNNING || 
+           state == TaskStatus.State.UNASSIGNED) {
+             taskCount++;
+      }    
+    }    
+    
+    return taskCount;    
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.groomName = Text.readString(in);
+    this.host = Text.readString(in);
+    this.failures = in.readInt();
+    this.maxTasks = in.readInt();
+    taskReports.clear();
+    int numTasks = in.readInt();
+    
+    TaskStatus status;
+    for (int i = 0; i < numTasks; i++) {
+      status = new TaskStatus();
+      status.readFields(in);
+    }   
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, groomName);
+    Text.writeString(out, host);
+    out.writeInt(failures);
+    out.writeInt(maxTasks);
+    out.writeInt(taskReports.size());    
+    for(TaskStatus taskStatus : taskReports) {
+      taskStatus.write(out);
+    }
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/HeartbeatResponse.java Tue May 25 12:37:48 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class HeartbeatResponse implements Writable, Configurable {
+  private Configuration conf;
+  short responseId;
+  private GroomServerAction [] actions; 
+
+  public HeartbeatResponse() {}
+  
+  public HeartbeatResponse(short responseId, GroomServerAction [] actions) {
+    this.responseId = responseId;
+    this.actions = actions;
+  }
+
+  public void setResponseId(short responseId) {
+    this.responseId = responseId;
+  }
+
+  public short getResponseId() {
+    return responseId;
+  }
+  
+  public void setActions(GroomServerAction [] actions) {
+    this.actions = actions;
+  }
+  
+  public GroomServerAction [] getActions() {
+    return actions;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.responseId = in.readShort();
+    int length = WritableUtils.readVInt(in);
+    if (length > 0) {
+      actions = new GroomServerAction[length];
+      for(int i=0; i< length; ++i) {
+        GroomServerAction.ActionType actionType = 
+          WritableUtils.readEnum(in, GroomServerAction.ActionType.class);
+        actions[i] = GroomServerAction.createAction(actionType);
+        actions[i].readFields(in);
+      }
+    } else {
+      actions = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeShort(this.responseId);
+    if(actions == null) {
+      WritableUtils.writeVInt(out, 0);
+    } else {
+      WritableUtils.writeVInt(out, actions.length);
+      for(GroomServerAction action: actions) {
+        WritableUtils.writeEnum(out, action.getActionId());
+        action.write(out);
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobChangeEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobChangeEvent.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobChangeEvent.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobChangeEvent.java Tue May 25 12:37:48 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * {@link JobChangeEvent} is used to capture state changes in a job. A job can
+ * change its state w.r.t priority, progress, run-state etc.
+ */
+abstract class JobChangeEvent {
+  private JobInProgress jip;
+
+  JobChangeEvent(JobInProgress jip) {
+    this.jip = jip;
+  }
+
+  /**
+   * Get the job object for which the change is reported
+   */
+  JobInProgress getJobInProgress() {
+    return jip;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Tue May 25 12:37:48 2010
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/*************************************************************
+ * JobInProgress maintains all the info for keeping a Job on the straight and
+ * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
+ * tables for doing bookkeeping of its Tasks.
+ * ***********************************************************
+ */
+class JobInProgress {
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+    private static final long serialVersionUID = 1L;
+
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+
+  static final Log LOG = LogFactory.getLog(JobInProgress.class);
+
+  JobProfile profile;
+  JobStatus status;
+  Path jobFile = null;
+  Path localJobFile = null;
+  Path localJarFile = null;
+
+  long startTime;
+  long launchTime;
+  long finishTime;
+
+  // private LocalFileSystem localFs;
+  private BSPJobID jobId;
+
+  final BSPMaster master;
+
+  public JobInProgress(BSPJobID jobId, BSPMaster master, Configuration conf)
+      throws IOException {
+    this.jobId = jobId;
+
+    this.master = master;
+    this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
+    this.startTime = System.currentTimeMillis();
+    status.setStartTime(startTime);
+    // this.localFs = FileSystem.getLocal(conf);
+
+    this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+        + ".xml");
+    this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
+        + ".jar");
+    Path jobDir = master.getSystemDirectoryForJob(jobId);
+    FileSystem fs = jobDir.getFileSystem(conf);
+    jobFile = new Path(jobDir, "job.xml");
+    fs.copyToLocalFile(jobFile, localJobFile);
+    BSPJobContext job = new BSPJobContext(localJobFile, jobId);
+
+    System.out.println("user:" + job.getUser());
+    System.out.println("jobId:" + jobId);
+    System.out.println("jobFile:" + jobFile.toString());
+    System.out.println("jobName:" + job.getJobName());
+
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+        .getJobName());
+
+    String jarFile = job.getJar();
+    if (jarFile != null) {
+      fs.copyToLocalFile(new Path(jarFile), localJarFile);
+    }
+  }
+
+  // ///////////////////////////////////////////////////
+  // Accessors for the JobInProgress
+  // ///////////////////////////////////////////////////
+  public JobProfile getProfile() {
+    return profile;
+  }
+
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  public synchronized long getLaunchTime() {
+    return launchTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  /**
+   * @return The JobID of this JobInProgress.
+   */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  public String toString() {
+    return "jobName:" + profile.getJobName() + "\n" + "submit user:"
+        + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
+        + jobFile + "\n";
+  }
+
+  // ///////////////////////////////////////////////////
+  // Create/manage tasks
+  // ///////////////////////////////////////////////////
+  public synchronized Task obtainNewTask(GroomServerStatus status,
+      int clusterSize, int numUniqueHosts) {
+
+    return null;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgressListener.java Tue May 25 12:37:48 2010
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * A listener for changes in a {@link JobInProgress job}'s lifecycle in the
+ * {@link BSPMaster}.
+ */
+abstract class JobInProgressListener {
+
+  /**
+   * Invoked when a new job has been added to the {@link BSPMaster}.
+   * 
+   * @param job The added job.
+   * @throws IOException
+   */
+  public abstract void jobAdded(JobInProgress job) throws IOException;
+
+  /**
+   * Invoked when a job has been removed from the {@link BSPMaster}.
+   * 
+   * @param job The removed job.
+   */
+  public abstract void jobRemoved(JobInProgress job);
+
+  /**
+   * Invoked when a job has been updated in the {@link BSPMaster}. This change
+   * in the job is tracker using {@link JobChangeEvent}.
+   * 
+   * @param event the event that tracks the change
+   */
+  public abstract void jobUpdated(JobChangeEvent event);
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobProfile.java Tue May 25 12:37:48 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**************************************************
+ * A JobProfile tracks job's status
+ * 
+ **************************************************/
+public class JobProfile implements Writable {
+
+  static { // register a ctor
+    WritableFactories.setFactory(JobProfile.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new JobProfile();
+      }
+    });
+  }
+
+  String user;
+  final BSPJobID jobid;
+  String jobFile;
+  String name;
+
+  /**
+   * Construct an empty {@link JobProfile}.
+   */
+  public JobProfile() {
+    jobid = new BSPJobID();
+  }
+
+  /**
+   * Construct a {@link JobProfile} the userid, jobid, job config-file,
+   * job-details url and job name.
+   * 
+   * @param user userid of the person who submitted the job.
+   * @param jobid id of the job.
+   * @param jobFile job configuration file.
+   * @param url link to the web-ui for details of the job.
+   * @param name user-specified job name.
+   * @param queueName name of the queue to which the job is submitted
+   */
+  public JobProfile(String user, BSPJobID jobid, String jobFile, String name) {
+    this.user = user;
+    this.jobid = jobid;
+    this.jobFile = jobFile;
+    this.name = name;
+  }
+
+  /**
+   * Get the user id.
+   */
+  public String getUser() {
+    return user;
+  }
+
+  /**
+   * Get the job id.
+   */
+  public BSPJobID getJobID() {
+    return jobid;
+  }
+
+  /**
+   * Get the configuration file for the job.
+   */
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  /**
+   * Get the user-specified job name.
+   */
+  public String getJobName() {
+    return name;
+  }
+
+  // /////////////////////////////////////
+  // Writable
+  // /////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+    Text.writeString(out, jobFile);
+    Text.writeString(out, user);
+    Text.writeString(out, name);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+    this.jobFile = Text.readString(in);
+    this.user = Text.readString(in);
+    this.name = Text.readString(in);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Tue May 25 12:37:48 2010
@@ -43,7 +43,7 @@ public class JobStatus implements Writab
   public static final int PREP = 4;
   public static final int KILLED = 5;
 
-  private JobID jobid;
+  private BSPJobID jobid;
   private float progress;
   private float cleanupProgress;
   private float setupProgress;
@@ -54,16 +54,16 @@ public class JobStatus implements Writab
   public JobStatus() {
   }
 
-  public JobStatus(JobID jobid, float progress, int runState) {
+  public JobStatus(BSPJobID jobid, float progress, int runState) {
     this(jobid, progress, 0.0f, runState);
   }
 
-  public JobStatus(JobID jobid, float progress, float cleanupProgress,
+  public JobStatus(BSPJobID jobid, float progress, float cleanupProgress,
       int runState) {
     this(jobid, 0.0f, progress, cleanupProgress, runState);
   }
 
-  public JobStatus(JobID jobid, float setupProgress, float progress,
+  public JobStatus(BSPJobID jobid, float setupProgress, float progress,
       float cleanupProgress, int runState) {
     this.jobid = jobid;
     this.setupProgress = setupProgress;
@@ -72,7 +72,7 @@ public class JobStatus implements Writab
     this.runState = runState;
   }
 
-  public JobID getJobID() {
+  public BSPJobID getJobID() {
     return jobid;
   }
 
@@ -148,7 +148,7 @@ public class JobStatus implements Writab
   }
 
   public synchronized void readFields(DataInput in) throws IOException {
-    this.jobid = new JobID();
+    this.jobid = new BSPJobID();
     jobid.readFields(in);
     this.setupProgress = in.readFloat();
     this.progress = in.readFloat();

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillJobAction.java Tue May 25 12:37:48 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to kill the task of a job and cleanup
+ * resources.
+ * 
+ */
+class KillJobAction extends GroomServerAction {
+  final BSPJobID jobId;
+
+  public KillJobAction() {
+    super(ActionType.KILL_JOB);
+    jobId = new BSPJobID();
+  }
+
+  public KillJobAction(BSPJobID jobId) {
+    super(ActionType.KILL_JOB);
+    this.jobId = jobId;
+  }
+
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobId.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobId.readFields(in);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/KillTaskAction.java Tue May 25 12:37:48 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} 
+ * to the {@link org.apache.hama.bsp.GroomServer} to kill a task.
+ * 
+ */
+class KillTaskAction extends GroomServerAction {
+  final TaskAttemptID taskId;
+  
+  public KillTaskAction() {
+    super(ActionType.KILL_TASK);
+    taskId = new TaskAttemptID();
+  }
+  
+  public KillTaskAction(TaskAttemptID taskId) {
+    super(ActionType.KILL_TASK);
+    this.taskId = taskId;
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Tue May 25 12:37:48 2010
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ * 
+ */
+class LaunchTaskAction extends GroomServerAction {
+  private Task task;
+
+  public LaunchTaskAction() {
+    super(ActionType.LAUNCH_TASK);
+  }
+
+  public LaunchTaskAction(Task task) {
+    super(ActionType.LAUNCH_TASK);
+    this.task = task;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task = new Task();
+    task.readFields(in);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPCluster.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPCluster.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalBSPCluster.java Tue May 25 12:37:48 2010
@@ -0,0 +1,97 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.util.ClusterUtil;
+
+public class LocalBSPCluster {
+  public static final Log LOG = LogFactory.getLog(LocalBSPCluster.class);
+  
+  private final BSPMaster master;
+  private final List<ClusterUtil.GroomServerThread> groomThreads;
+  private final Configuration conf;
+  private Class<? extends GroomServer> groomServerClass;
+  private final static int DEFAULT_NO = 3;
+
+  public LocalBSPCluster(final Configuration conf) throws IOException, InterruptedException {
+    this(conf, DEFAULT_NO);
+  }
+
+  public LocalBSPCluster(final Configuration conf, final int noGroomServers)
+      throws IOException, InterruptedException {
+    this(conf, noGroomServers, BSPMaster.class,
+        getGroomServerImplementation(conf));
+  }
+
+  @SuppressWarnings("unchecked")
+  public LocalBSPCluster(final Configuration conf, final int noGroomServers,
+      final Class<? extends BSPMaster> masterClass,
+      final Class<? extends GroomServer> groomServerClass) throws IOException, InterruptedException {
+    conf.set("bsp.master.port", "40000");
+    conf.set("bsp.groom.port", "40020");
+    conf.set("bsp.local.dir", conf.get("hadoop.tmp.dir") + "/bsp/local");
+    conf.set("bsp.system.dir", conf.get("hadoop.tmp.dir") + "/bsp/system");
+    this.conf = conf;
+    
+    // Create the master
+    this.master = BSPMaster.constructMaster(masterClass, conf);
+    this.groomThreads = new CopyOnWriteArrayList<ClusterUtil.GroomServerThread>();
+    this.groomServerClass = (Class<? extends GroomServer>) conf.getClass(
+        HConstants.REGION_SERVER_IMPL, groomServerClass);
+    for (int i = 0; i < noGroomServers; i++) {
+      addGroomServer(i);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Class<? extends GroomServer> getGroomServerImplementation(
+      final Configuration conf) {
+    return (Class<? extends GroomServer>) conf.getClass(
+        HConstants.REGION_SERVER_IMPL, GroomServer.class);
+  }
+
+  public ClusterUtil.GroomServerThread addGroomServer(final int index)
+      throws IOException {
+    LOG.info("Adding Groom Server");
+    ClusterUtil.GroomServerThread rst = ClusterUtil
+        .createGroomServerThread(this.conf, this.groomServerClass, index);
+    this.groomThreads.add(rst);
+    return rst;
+  }
+
+  public void startup() {
+    try {
+      ClusterUtil.startup(this.master, this.groomThreads, conf);
+    } catch (IOException e) {
+      LOG.info(e);
+    } catch (InterruptedException e) {
+      LOG.info(e);
+    }
+  }
+
+  public void shutdown() {
+      ClusterUtil.shutdown(this.master, this.groomThreads, conf);
+  }
+  
+  /**
+   * Test things basically work.
+   * 
+   * @param args
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  public static void main(String[] args) throws IOException, InterruptedException {
+    StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
+    HamaConfiguration conf = new HamaConfiguration();
+    LocalBSPCluster cluster = new LocalBSPCluster(conf);
+    cluster.startup();
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitTrackerAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitTrackerAction.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitTrackerAction.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/ReinitTrackerAction.java Tue May 25 12:37:48 2010
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
+ * 
+ */
+class ReinitTrackerAction extends GroomServerAction {
+
+  public ReinitTrackerAction() {
+    super(ActionType.REINIT_GROOM);
+  }
+
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public void readFields(DataInput in) throws IOException {
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/RunningJob.java Tue May 25 12:37:48 2010
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * <code>RunningJob</code> is the user-interface to query for details on a
+ * running BSP job.
+ * 
+ * <p>
+ * Clients can get hold of <code>RunningJob</code> via the {@link BSPJobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.
+ * </p>
+ * 
+ * @see BSPJobClient
+ */
+public interface RunningJob {
+  /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public BSPJobID getID();
+
+  /**
+   * Get the name of the job.
+   * 
+   * @return the name of the job.
+   */
+  public String getJobName();
+
+  /**
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
+   */
+  public String getJobFile();
+
+  /**
+   * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
+   * When all bsp tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's tasks.
+   * @throws IOException
+   */
+  public float progress() throws IOException;
+
+  /**
+   * Check if the job is finished or not. This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException;
+
+  /**
+   * Check if the job completed successfully.
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException;
+
+  /**
+   * Blocks until the job is complete.
+   * 
+   * @throws IOException
+   */
+  public void waitForCompletion() throws IOException;
+
+  /**
+   * Returns the current state of the Job. {@link JobStatus}
+   * 
+   * @throws IOException
+   */
+  public int getJobState() throws IOException;
+
+  /**
+   * Kill the running job. Blocks until all job tasks have been killed as well.
+   * If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException;
+
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   * @throws IOException
+   */
+  public void killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/SimpleTaskScheduler.java Tue May 25 12:37:48 2010
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class SimpleTaskScheduler extends TaskScheduler {
+  private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
+  List<JobInProgress> jobQueue;
+
+  public SimpleTaskScheduler() {
+    jobQueue = new ArrayList<JobInProgress>();
+  }
+
+  @Override
+  public void addJob(JobInProgress job) {
+    LOG.debug(">> Added a job (" + job + ") to scheduler (remaining jobs: "
+        + (jobQueue.size() + 1) + ")");
+    jobQueue.add(job);
+  }
+
+  @Override
+  public Collection<JobInProgress> getJobs() {
+    return jobQueue;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @seeorg.apache.hama.bsp.TaskScheduler#assignTasks(org.apache.hama.bsp.
+   * GroomServerStatus)
+   */
+  @Override
+  public List<Task> assignTasks(GroomServerStatus groomStatus)
+      throws IOException {
+    ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+
+    final int numGroomServers = clusterStatus.getGroomServers();
+    // final int clusterTaskCapacity = clusterStatus.getMaxTasks();
+
+    //
+    // Get task counts for the current groom.
+    //
+    // final int groomTaskCapacity = groom.getMaxTasks();
+    final int groomRunningTasks = groomStatus.countTasks();
+
+    // Assigned tasks
+    List<Task> assignedTasks = new ArrayList<Task>();
+
+    // Task task = null;
+    if (groomRunningTasks == 0) {
+      // TODO - Each time a job is submitted in BSPMaster, add a JobInProgress
+      // instance to the scheduler.
+      synchronized (jobQueue) {
+        for (JobInProgress job : jobQueue) {
+          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+            continue;
+          }
+
+          Task t = null;
+
+          t = job.obtainNewTask(groomStatus, numGroomServers,
+              groomServerManager.getNumberOfUniqueHosts());
+          if (t != null) {
+            assignedTasks.add(t);
+            break; // TODO - Now, simple scheduler assigns only one task to
+            // each groom. Later, it will be improved for scheduler to
+            // assign one or more tasks to each groom according to
+            // its capacity.
+          }
+        }
+      }
+    }
+
+    return assignedTasks;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Tue May 25 12:37:48 2010
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ *
+ */
+public class Task implements Writable {
+  ////////////////////////////////////////////
+  // Fields
+  ////////////////////////////////////////////
+  private String jobFile;
+  private TaskAttemptID taskId;
+  private int partition;
+  
+  protected LocalDirAllocator lDirAlloc;
+  /**
+   * 
+   */
+  public Task() {
+    taskId = new TaskAttemptID();
+  }
+  
+  public Task(String jobFile, TaskAttemptID taskId, int partition) {
+    this.jobFile = jobFile;
+    this.taskId = taskId;
+     
+    this.partition = partition;
+  }
+  
+  ////////////////////////////////////////////
+  // Accessors
+  ////////////////////////////////////////////
+  public void setJobFile(String jobFile) { 
+    this.jobFile = jobFile; 
+  }
+  
+  public String getJobFile() { 
+    return jobFile; 
+  }
+  
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+  
+  /**
+   * Get the job name for this task.
+   * @return the job name
+   */
+  public BSPJobID getJobID() {
+    return taskId.getJobID();
+  }
+  
+  /**
+   * Get the index of this task within the job.
+   * @return the integer part of the task id
+   */
+  public int getPartition() {
+    return partition;
+  }
+  
+  @Override
+  public String toString() { 
+    return taskId.toString(); 
+  }
+  
+  ////////////////////////////////////////////
+  // Writable
+  ////////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, jobFile);
+    taskId.write(out);
+    out.writeInt(partition);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobFile = Text.readString(in);
+    taskId.readFields(in);
+    partition = in.readInt();
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptContext.java Tue May 25 12:37:48 2010
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.bsp;
 
 import java.io.IOException;
@@ -26,7 +25,7 @@ import org.apache.hadoop.util.Progressab
 /**
  * The context for task attempts.
  */
-public class TaskAttemptContext extends JobContext implements Progressable {
+public class TaskAttemptContext extends BSPJobContext implements Progressable {
   private final TaskAttemptID taskId;
   private String status = "";
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Tue May 25 12:37:48 2010
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.bsp;
 
 import java.io.DataInput;
@@ -43,7 +42,7 @@ public class TaskAttemptID extends ID {
     taskId = new TaskID();
   }
 
-  public JobID getJobID() {
+  public BSPJobID getJobID() {
     return taskId.getJobID();
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskID.java Tue May 25 12:37:48 2010
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hama.bsp;
 
 import java.io.DataInput;
@@ -31,10 +30,10 @@ public class TaskID extends ID {
     idFormat.setMinimumIntegerDigits(6);
   }
 
-  private JobID jobId;
+  private BSPJobID jobId;
   private boolean isMatrixTask;
 
-  public TaskID(JobID jobId, boolean isMatrixTask, int id) {
+  public TaskID(BSPJobID jobId, boolean isMatrixTask, int id) {
     super(id);
     if (jobId == null) {
       throw new IllegalArgumentException("jobId cannot be null");
@@ -44,15 +43,15 @@ public class TaskID extends ID {
   }
 
   public TaskID(String jtIdentifier, int jobId, boolean isGraphTask, int id) {
-    this(new JobID(jtIdentifier, jobId), isGraphTask, id);
+    this(new BSPJobID(jtIdentifier, jobId), isGraphTask, id);
   }
 
   public TaskID() {
-    jobId = new JobID();
+    jobId = new BSPJobID();
   }
 
-  /** Returns the {@link JobID} object that this tip belongs to */
-  public JobID getJobID() {
+  /** Returns the {@link BSPJobID} object that this tip belongs to */
+  public BSPJobID getJobID() {
     return jobId;
   }
 

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Tue May 25 12:37:48 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+class TaskInProgress {
+  public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
+
+  private BSPJobContext context;
+
+  // Constants
+  static final int MAX_TASK_EXECS = 1;
+  int maxTaskAttempts = 4;
+
+  // Job Meta
+  private String jobFile = null;
+  private int partition;
+  private BSPMaster bspMaster;
+  private TaskID id;
+  private JobInProgress job;
+  private int completes = 0;
+
+  // Status
+  // private double progress = 0;
+  // private String state = "";
+  private long startTime = 0;
+
+  // The 'next' usable taskid of this tip
+  int nextTaskId = 0;
+
+  // The taskid that took this TIP to SUCCESS
+  // private TaskAttemptID successfulTaskId;
+
+  // The first taskid of this tip
+  private TaskAttemptID firstTaskId;
+
+  // Map from task Id -> GroomServer Id, contains tasks that are
+  // currently runnings
+  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+  // All attempt Ids of this TIP
+  // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
+  /**
+   * Map from taskId -> TaskStatus
+   */
+  private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+
+  public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+      BSPJobContext context, JobInProgress job, int partition) {
+    this.jobFile = jobFile;
+    this.bspMaster = master;
+    this.job = job;
+    this.context = context;
+    this.partition = partition;
+  }
+
+  // //////////////////////////////////
+  // Accessors
+  // //////////////////////////////////
+  /**
+   * Return the start time
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Return the parent job
+   */
+  public JobInProgress getJob() {
+    return job;
+  }
+
+  public TaskID getTIPId() {
+    return this.id;
+  }
+
+  /**
+   * Is the Task associated with taskid is the first attempt of the tip?
+   * 
+   * @param taskId
+   * @return Returns true if the Task is the first attempt of the tip
+   */
+  public boolean isFirstAttempt(TaskAttemptID taskId) {
+    return firstTaskId == null ? false : firstTaskId.equals(taskId);
+  }
+
+  /**
+   * Is this tip currently running any tasks?
+   * 
+   * @return true if any tasks are running
+   */
+  public boolean isRunning() {
+    return !activeTasks.isEmpty();
+  }
+
+  /**
+   * Is this tip complete?
+   * 
+   * @return <code>true</code> if the tip is complete, else <code>false</code>
+   */
+  public synchronized boolean isComplete() {
+    return (completes > 0);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java Tue May 25 12:37:48 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
+ * s.
+ */
+abstract class TaskScheduler implements Configurable {
+
+  protected Configuration conf;
+  protected GroomServerManager groomServerManager;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setGroomServerManager(
+      GroomServerManager groomServerManager) {
+    this.groomServerManager = groomServerManager;
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to start any work in separate
+   * threads.
+   * 
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to stop any work it is doing.
+   * 
+   * @throws IOException
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+
+  public abstract void addJob(JobInProgress job);
+
+  /**
+   * Returns a collection of jobs in an order which is specific to the
+   * particular scheduler.
+   * 
+   * @param queueName
+   * @return
+   */
+  public abstract Collection<JobInProgress> getJobs();
+
+  /**
+   * Returns the tasks we'd like the GroomServer to execute right now.
+   * 
+   * @param groomServer The GroomServer for which we're looking for tasks.
+   * @return A list of tasks to run on that GroomServer, possibly empty.
+   */
+  public abstract List<Task> assignTasks(GroomServerStatus groomStatus)
+      throws IOException;
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Tue May 25 12:37:48 2010
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+class TaskStatus implements Writable {
+  static final Log LOG = LogFactory.getLog(TaskStatus.class);
+
+  // enumeration for reporting current phase of a task.
+  public static enum Phase {
+    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+  }
+
+  // what state is the task in?
+  public static enum State {
+    RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
+  }
+
+  private final TaskAttemptID taskId;
+  private float progress;
+  private volatile State runState;
+  private String stateString;
+  private String groomServer;
+
+  private long startTime;
+  private long finishTime;
+
+  private volatile Phase phase = Phase.STARTING;
+
+  /**
+   * 
+   */
+  public TaskStatus() {
+    taskId = new TaskAttemptID();
+  }
+
+  public TaskStatus(TaskAttemptID taskId, float progress, State runState,
+      String stateString, String groomServer, Phase phase) {
+    this.taskId = taskId;
+    this.progress = progress;
+    this.runState = runState;
+    this.stateString = stateString;
+    this.groomServer = groomServer;
+    this.phase = phase;
+  }
+
+  // //////////////////////////////////////////////////
+  // Accessors and Modifiers
+  // //////////////////////////////////////////////////
+
+  public TaskAttemptID getTaskId() {
+    return taskId;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public State getRunState() {
+    return runState;
+  }
+
+  public void setRunState(State state) {
+    this.runState = state;
+  }
+
+  public String getStateString() {
+    return stateString;
+  }
+
+  public void setStateString(String stateString) {
+    this.stateString = stateString;
+  }
+
+  public String getGroomServer() {
+    return groomServer;
+  }
+
+  public void setGroomServer(String groomServer) {
+    this.groomServer = groomServer;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Get start time of the task.
+   * 
+   * @return 0 is start time is not set, else returns start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Set startTime of the task.
+   * 
+   * @param startTime start time
+   */
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  /**
+   * Get current phase of this task.
+   * 
+   * @return .
+   */
+  public Phase getPhase() {
+    return this.phase;
+  }
+
+  /**
+   * Set current phase of this task.
+   * 
+   * @param phase phase of this task
+   */
+  void setPhase(Phase phase) {
+    this.phase = phase;
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * This update is done by ping thread before sending the status.
+   * 
+   * @param progress
+   * @param state
+   * @param counters
+   */
+  synchronized void statusUpdate(float progress, String state) {
+    setProgress(progress);
+    setStateString(state);
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * @param status updated status
+   */
+  synchronized void statusUpdate(TaskStatus status) {
+    this.progress = status.getProgress();
+    this.runState = status.getRunState();
+    this.stateString = status.getStateString();
+
+    if (status.getStartTime() != 0) {
+      this.startTime = status.getStartTime();
+    }
+    if (status.getFinishTime() != 0) {
+      this.finishTime = status.getFinishTime();
+    }
+
+    this.phase = status.getPhase();
+  }
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in BSPMaster when a cleanup attempt of task reports its
+   * status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, float progress, String state,
+      Phase phase, long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime;
+    }
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  // ////////////////////////////////////////////
+  // Writable
+  // ////////////////////////////////////////////
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.taskId.readFields(in);
+    this.progress = in.readFloat();
+    this.runState = WritableUtils.readEnum(in, State.class);
+    this.stateString = Text.readString(in);
+    this.phase = WritableUtils.readEnum(in, Phase.class);
+    this.startTime = in.readLong();
+    this.finishTime = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+    out.writeFloat(progress);
+    WritableUtils.writeEnum(out, runState);
+    Text.writeString(out, stateString);
+    WritableUtils.writeEnum(out, phase);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Work.java Tue May 25 12:37:48 2010
@@ -0,0 +1,5 @@
+package org.apache.hama.bsp;
+
+public class Work {
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java Tue May 25 12:37:48 2010
@@ -1,6 +1,4 @@
 /**
- * Copyright 2008 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/InterTrackerProtocol.java Tue May 25 12:37:48 2010
@@ -1,6 +1,4 @@
 /**
- * Copyright 2008 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,8 +17,15 @@
  */
 package org.apache.hama.ipc;
 
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.HeartbeatResponse;
+
 public interface InterTrackerProtocol extends HamaRPCProtocolVersion {
-  public HeartbeatResponse heartbeat(short responseId);
+  public HeartbeatResponse heartbeat(GroomServerStatus status, 
+      boolean restarted, boolean initialContact, boolean acceptNewTasks,
+      short responseId) throws IOException;
 
   public String getSystemDir();
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/JobSubmissionProtocol.java Tue May 25 12:37:48 2010
@@ -1,6 +1,4 @@
 /**
- * Copyright 2008 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,15 +19,89 @@ package org.apache.hama.ipc;
 
 import java.io.IOException;
 
-import org.apache.hama.bsp.JobID;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.JobProfile;
 import org.apache.hama.bsp.JobStatus;
+import org.apache.hama.bsp.TaskAttemptID;
 
 /**
- * Protocol that a Walker and the central Master use to communicate. This
+ * Protocol that a groom server and the central BSP Master use to communicate. This
  * interface will contains several methods: submitJob, killJob, and killTask.
  */
 public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
-  public JobID getNewJobId() throws IOException;
+  
+  /**
+   * Allocate a new id for the job.
+   * @return
+   * @throws IOException
+   */
+  public BSPJobID getNewJobId() throws IOException;
+  
+  
+  /**
+   * Submit a Job for execution.  Returns the latest profile for
+   * that job. 
+   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   *
+   * @param jobName
+   * @return
+   * @throws IOException
+   */
+  public JobStatus submitJob(BSPJobID jobName) throws IOException;
+  
+  /**
+   * Get the current status of the cluster
+   * @param detailed if true then report groom names as well
+   * @return summary of the state of the cluster
+   */
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+  
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * @return Profile of the job, or null if not found. 
+   */
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
+  
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * @return Status of the job, or null if not found.
+   */
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
+  
+  /**
+   * A BSP system always operates on a single filesystem.  This 
+   * function returns the fs name.  ('local' if the localfs; 'addr:port' 
+   * if dfs).  The client can then copy files into the right locations 
+   * prior to submitting the job.
+   */
+  public String getFilesystemName() throws IOException;
+  
+  /** 
+   * Get all the jobs submitted. 
+   * @return array of JobStatus for the submitted jobs
+   */
+  public JobStatus[] getAllJobs() throws IOException;
 
-  public JobStatus submitJob(JobID jobName) throws IOException;
+  /**
+   * Grab the bspmaster system directory path where job-specific files are to be placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir();
+  
+  /**
+   * Kill the indicated job
+   */
+  public void killJob(BSPJobID jobid) throws IOException;
+  
+  
+  /**
+   * Kill indicated task attempt.
+   * @param taskId the id of the task to kill.
+   * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
+   * it is just killed, w/o affecting job failure status.  
+   */ 
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;  
+  
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/ClusterUtil.java Tue May 25 12:37:48 2010
@@ -0,0 +1,98 @@
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
+
+public class ClusterUtil {
+  public static final Log LOG = LogFactory.getLog(ClusterUtil.class);
+
+  /**
+   * Data Structure to hold GroomServer Thread and GroomServer instance
+   */
+  public static class GroomServerThread extends Thread {
+    private final GroomServer groomServer;
+
+    public GroomServerThread(final GroomServer r, final int index) {
+      super(r, "GroomServer:" + index);
+      this.groomServer = r;
+    }
+
+    /** @return the groom server */
+    public GroomServer getGroomServer() {
+      return this.groomServer;
+    }
+
+    /**
+     * Block until the groom server has come online, indicating it is ready
+     * to be used.
+     */
+    public void waitForServerOnline() {
+      while (!groomServer.isRunning()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // continue waiting
+        }
+      }
+    }
+  }
+
+  /**
+   * Creates a {@link GroomServerThread}.
+   * Call 'start' on the returned thread to make it run.
+   * @param c Configuration to use.
+   * @param hrsc Class to create.
+   * @param index Used distingushing the object returned.
+   * @throws IOException
+   * @return Groom server added.
+   */
+  public static ClusterUtil.GroomServerThread createGroomServerThread(final Configuration c,
+    final Class<? extends GroomServer> hrsc, final int index)
+  throws IOException {
+    GroomServer server;
+      try {
+        server = hrsc.getConstructor(Configuration.class).newInstance(c);
+      } catch (Exception e) {
+        IOException ioe = new IOException();
+        ioe.initCause(e);
+        throw ioe;
+      }
+      return new ClusterUtil.GroomServerThread(server, index);
+  }
+
+  /**
+   * Start the cluster.
+   * @param m
+   * @param conf 
+   * @param groomServers
+   * @return Address to use contacting master.
+   * @throws InterruptedException 
+   * @throws IOException 
+   */
+  public static String startup(final BSPMaster m,
+      final List<ClusterUtil.GroomServerThread> groomservers, Configuration conf) throws IOException, InterruptedException {
+    if (m != null) {
+      m.start();
+    }
+
+    if (groomservers != null) {
+      for (ClusterUtil.GroomServerThread t: groomservers) {
+        t.start();
+      }
+    }
+    
+    return m == null? null: BSPMaster.getAddress(conf).getHostName();
+  }
+
+  public static void shutdown(BSPMaster master,
+      List<GroomServerThread> groomThreads, Configuration conf) {
+    LOG.debug("Shutting down HAMA Cluster");
+    // TODO: 
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java Tue May 25 12:37:48 2010
@@ -19,18 +19,34 @@
  */
 package org.apache.hama;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.bsp.LocalBSPCluster;
 
 /**
  * Forming up the miniDfs and miniHbase
  */
 public abstract class HamaCluster extends HBaseClusterTestCase {
+  public static final Log LOG = LogFactory.getLog(HamaCluster.class);
   protected final static HamaConfiguration conf = new HamaConfiguration();
-  public void setUp() throws Exception {
+
+  protected void setUp() throws Exception {
     super.setUp();
+    
+    String[] args = new String[0];
+    StringUtils.startupShutdownMessage(LocalBSPCluster.class, args, LOG);
+    HamaConfiguration conf = new HamaConfiguration();
+    LocalBSPCluster cluster = new LocalBSPCluster(conf);
+    cluster.startup();
   }
-  
-  public static HamaConfiguration getConf() {
+
+  protected static HamaConfiguration getConf() {
     return conf;
   }
+  
+  protected void setMiniBSPCluster() {
+    // TODO Auto-generated method stub    
+  }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPPeerTest.java Tue May 25 12:37:48 2010
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -60,14 +61,14 @@ public class BSPPeerTest extends HamaClu
     Stat s = null;
     if (zk != null) {
       try {
-        s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
       } catch (Exception e) {
         LOG.error(s);
       }
 
       if (s == null) {
         try {
-          zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
               Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch (KeeperException e) {
           LOG.error(e);
@@ -155,9 +156,9 @@ public class BSPPeerTest extends HamaClu
     BSPPeerThread thread;
     for (int i = 0; i < NUM_PEER; i++) {
       conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
-      conf.set(BSPConstants.PEER_HOST, "localhost");
-      conf.set(BSPConstants.PEER_PORT, String.valueOf(30000 + i));
-      conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+      conf.set(Constants.PEER_HOST, "localhost");
+      conf.set(Constants.PEER_PORT, String.valueOf(30000 + i));
+      conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
       thread = new BSPPeerThread(conf);
       list.add(thread);
     }
@@ -178,25 +179,25 @@ public class BSPPeerTest extends HamaClu
     Configuration conf = new Configuration();    
     BSPPeer peer = new BSPPeer(conf);
     
-    System.out.println(peer.bindAddress+" = "+BSPConstants.DEFAULT_PEER_HOST);
-    System.out.println(peer.bindPort+" = "+BSPConstants.DEFAULT_PEER_PORT);
-    assertEquals(peer.bindAddress,BSPConstants.DEFAULT_PEER_HOST);
-    assertEquals(peer.bindPort,BSPConstants.DEFAULT_PEER_PORT);
-    assertEquals(peer.zookeeperAddr,BSPConstants.DEFAULT_ZOOKEEPER_SERVER_ADDR);
+    System.out.println(peer.bindAddress+" = "+Constants.DEFAULT_PEER_HOST);
+    System.out.println(peer.bindPort+" = "+Constants.DEFAULT_PEER_PORT);
+    assertEquals(peer.bindAddress,Constants.DEFAULT_PEER_HOST);
+    assertEquals(peer.bindPort,Constants.DEFAULT_PEER_PORT);
+    assertEquals(peer.zookeeperAddr,Constants.DEFAULT_ZOOKEEPER_SERVER_ADDR);
     
     int peerPort;
     int zkPort;
     conf = new Configuration();
-    conf.set(BSPConstants.PEER_HOST, "localhost");
+    conf.set(Constants.PEER_HOST, "localhost");
     do{      
       peerPort = r.nextInt(Short.MAX_VALUE);
     } while(peerPort == 0);    
-    conf.setInt(BSPConstants.PEER_PORT, peerPort);
+    conf.setInt(Constants.PEER_PORT, peerPort);
     
     do{      
       zkPort = r.nextInt(Short.MAX_VALUE);
     } while(zkPort == peerPort || zkPort == 0);    
-    conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort);
+    conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:"+zkPort);
     peer = new BSPPeer(conf);
     assertEquals(peer.bindAddress,"localhost");
     assertEquals(peer.bindPort,peerPort);

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/BSPTestDriver.java Tue May 25 12:37:48 2010
@@ -0,0 +1,25 @@
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hama.HamaConfiguration;
+
+public class BSPTestDriver {
+
+  /**
+   * @param args
+   * @throws IOException 
+   * @throws InterruptedException 
+   */
+  public static void main(String[] args) throws IOException, InterruptedException {
+    BSPJob job = new BSPJob(new HamaConfiguration());
+    job.setJarByClass(Work.class);
+    job.setWorkClass(Work.class);
+    job.submit();
+    Thread.sleep(3000);
+
+    System.out.println("job id:"+job.getJobID());
+    System.out.println("job Name:"+job.getJobName());
+    System.out.println("working dir:"+job.getWorkingDirectory());
+  }
+}

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java?rev=948024&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/MiniBSPCluster.java Tue May 25 12:37:48 2010
@@ -0,0 +1,5 @@
+package org.apache.hama.bsp;
+
+public class MiniBSPCluster {
+
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java?rev=948024&r1=948023&r2=948024&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/SerializePrinting.java Tue May 25 12:37:48 2010
@@ -7,6 +7,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -37,14 +38,14 @@ public class SerializePrinting extends H
     Stat s = null;
     if (zk != null) {
       try {
-        s = zk.exists(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, false);
+        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
       } catch (Exception e) {
         LOG.error(s);
       }
 
       if (s == null) {
         try {
-          zk.create(BSPConstants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
+          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
               Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch (KeeperException e) {
           LOG.error(e);
@@ -60,10 +61,10 @@ public class SerializePrinting extends H
     int[] randomSequence = new int[] { 2, 3, 4, 5, 0, 1, 6, 7, 8, 9 };
     for (int i = 0; i < NUM_PEER; i++) {
       conf.set("bsp.peers.num", String.valueOf(NUM_PEER));
-      conf.set(BSPConstants.PEER_HOST, "localhost");
-      conf.set(BSPConstants.PEER_PORT, String
+      conf.set(Constants.PEER_HOST, "localhost");
+      conf.set(Constants.PEER_PORT, String
           .valueOf(30000 + randomSequence[i]));
-      conf.set(BSPConstants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
+      conf.set(Constants.ZOOKEEPER_SERVER_ADDRS, "localhost:21810");
       thread = new BSPPeerThread(conf, randomSequence[i]);
       System.out.println(randomSequence[i] + ", " + thread.getName());
       list.add(thread);



Mime
View raw message