incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1176297 [8/19] - in /incubator/hama/branches/HamaV2: ./ api/ api/target/ api/target/classes/ api/target/classes/META-INF/ api/target/lib/ api/target/maven-archiver/ api/target/maven-shared-archive-resources/ api/target/maven-shared-archive...
Date Tue, 27 Sep 2011 09:35:48 GMT
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to launch a new task.
+ */
+class LaunchTaskAction extends GroomServerAction {
+  private Task task;
+
+  public LaunchTaskAction() {
+    super(ActionType.LAUNCH_TASK);
+  }
+
+  public LaunchTaskAction(Task task) {
+    super(ActionType.LAUNCH_TASK);
+    this.task = task;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    task = new BSPTask();
+    task.readFields(in);
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LaunchTaskAction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.ipc.JobSubmissionProtocol;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * A multithreaded local BSP runner that can be used for debugging and local
+ * running BSP's.
+ */
+public class LocalBSPRunner implements JobSubmissionProtocol {
+  public static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
+
+  private static final String IDENTIFIER = "localrunner";
+  private static String WORKING_DIR = "/user/hama/bsp/";
+  protected static volatile ThreadPoolExecutor threadPool;
+  protected static int threadPoolSize;
+  protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
+  protected static CyclicBarrier barrier;
+
+  protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
+  protected String jobFile;
+  protected String jobName;
+
+  protected JobStatus currentJobStatus;
+
+  protected Configuration conf;
+  protected FileSystem fs;
+
+  public LocalBSPRunner(Configuration conf) throws IOException {
+    super();
+    this.conf = conf;
+    String path = conf.get("bsp.local.dir");
+    if (path != null && !path.isEmpty()) {
+      WORKING_DIR = path;
+    }
+
+    threadPoolSize = conf.getInt("bsp.local.tasks.maximum", 20);
+    threadPool = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(threadPoolSize);
+    barrier = new CyclicBarrier(threadPoolSize);
+
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      localGrooms.put(name, new LocalGroom(name));
+    }
+
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return 3;
+  }
+
+  @Override
+  public BSPJobID getNewJobId() throws IOException {
+    return new BSPJobID(IDENTIFIER, 1);
+  }
+
+  @Override
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
+    this.jobFile = jobFile;
+    
+    if(fs == null)
+      this.fs = FileSystem.get(conf);
+    
+    // add the resource to the current configuration, because add resouce in
+    // HamaConfigurations constructor (ID,FILE) does not take local->HDFS
+    // connections into account. This leads to not serializing the
+    // configuration, which yields into failure.
+    conf.addResource(fs.open(new Path(jobFile)));
+
+    BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
+    job.setNumBspTask(threadPoolSize);
+
+    this.jobName = job.getJobName();
+    currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
+        JobStatus.RUNNING);
+    for (int i = 0; i < threadPoolSize; i++) {
+      String name = IDENTIFIER + " " + i;
+      LocalGroom localGroom = new LocalGroom(name);
+      localGrooms.put(name, localGroom);
+      futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
+          .newInstance(job.getBspClass(), conf), localGroom)));
+    }
+    new Thread(new ThreadObserver(currentJobStatus)).start();
+    return currentJobStatus;
+  }
+
+  @Override
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    Map<String, GroomServerStatus> map = new HashMap<String, GroomServerStatus>();
+    for (Entry<String, LocalGroom> entry : localGrooms.entrySet()) {
+      map.put(entry.getKey(), new GroomServerStatus(entry.getKey(),
+          new ArrayList<TaskStatus>(0), 0, 0, "", entry.getKey()));
+    }
+    return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING);
+  }
+
+  @Override
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException {
+    return new JobProfile(System.getProperty("user.name"), jobid, jobFile,
+        jobName);
+  }
+
+  @Override
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
+    if (currentJobStatus == null) {
+      currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
+          0L, JobStatus.RUNNING);
+    }
+    return currentJobStatus;
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException {
+    return fs.getUri().toString();
+  }
+
+  @Override
+  public JobStatus[] jobsToComplete() throws IOException {
+    return null;
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException {
+    return null;
+  }
+
+  @Override
+  public String getSystemDir() {
+    return WORKING_DIR;
+  }
+
+  @Override
+  public void killJob(BSPJobID jobid) throws IOException {
+    return;
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException {
+    return false;
+  }
+
+  // this class will spawn a new thread and executes the BSP
+  class BSPRunner implements Callable<BSP> {
+
+    Configuration conf;
+    BSPJob job;
+    BSP bsp;
+    LocalGroom groom;
+
+    public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) {
+      super();
+      this.conf = conf;
+      this.job = job;
+      this.bsp = bsp;
+      this.groom = groom;
+    }
+
+    public void run() {
+      bsp.setConf(conf);
+      try {
+        bsp.bsp(groom);
+      } catch (Exception e) {
+        LOG.error("Exception during BSP execution!", e);
+      }
+    }
+
+    @Override
+    public BSP call() throws Exception {
+      run();
+      return bsp;
+    }
+  }
+
+  // this thread observes the status of the runners.
+  class ThreadObserver implements Runnable {
+
+    JobStatus status;
+
+    public ThreadObserver(JobStatus currentJobStatus) {
+      this.status = currentJobStatus;
+    }
+
+    @Override
+    public void run() {
+      boolean success = true;
+      for (Future<BSP> future : futureList) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          LOG.error("Exception during BSP execution!", e);
+          success = false;
+        } catch (ExecutionException e) {
+          LOG.error("Exception during BSP execution!", e);
+          success = false;
+        }
+      }
+      if (success) {
+        currentJobStatus.setState(JobStatus.State.SUCCEEDED);
+        currentJobStatus.setRunState(JobStatus.SUCCEEDED);
+      } else {
+        currentJobStatus.setState(JobStatus.State.FAILED);
+        currentJobStatus.setRunState(JobStatus.FAILED);
+      }
+      threadPool.shutdownNow();
+    }
+
+  }
+
+  class LocalGroom extends BSPPeer {
+    private long superStepCount = 0;
+    private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
+    // outgoing queue
+    private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
+    private final String peerName;
+
+    public LocalGroom(String peerName) throws IOException {
+      this.peerName = peerName;
+    }
+
+    @Override
+    public void send(String peerName, BSPMessage msg) throws IOException {
+      if (this.peerName.equals(peerName)) {
+        put(msg);
+      } else {
+        // put this into a outgoing queue
+        if (outgoingQueues.get(peerName) == null) {
+          outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
+        }
+        outgoingQueues.get(peerName).add(msg);
+      }
+    }
+
+    @Override
+    public void put(BSPMessage msg) throws IOException {
+      localMessageQueue.add(msg);
+    }
+
+    @Override
+    public BSPMessage getCurrentMessage() throws IOException {
+      return localMessageQueue.poll();
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return localMessageQueue.size();
+    }
+
+    @Override
+    public void sync() throws IOException, KeeperException,
+        InterruptedException {
+      // wait until all threads reach this barrier
+      barrierSync();
+      // send the messages
+      for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
+          .entrySet()) {
+        String peerName = entry.getKey();
+        for (BSPMessage msg : entry.getValue())
+          localGrooms.get(peerName).put(msg);
+      }
+      // clear the local outgoing queue
+      outgoingQueues.clear();
+      // sync again to avoid data inconsistency
+      barrierSync();
+      incrementSuperSteps();
+    }
+
+    private void barrierSync() throws InterruptedException {
+      try {
+        barrier.await();
+      } catch (BrokenBarrierException e) {
+        throw new InterruptedException("Barrier has been broken!" + e);
+      }
+    }
+
+    private void incrementSuperSteps() {
+      currentJobStatus.setprogress(superStepCount++);
+      currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superStepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return peerName;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return localGrooms.keySet().toArray(
+          new String[localGrooms.keySet().size()]);
+    }
+
+    @Override
+    public void clear() {
+      localMessageQueue.clear();
+    }
+
+    @Override
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 3;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public void put(BSPMessageBundle messages) throws IOException {
+    }
+
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Messagable.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Messagable.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Messagable.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Messagable.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+/**
+ * A interface for BSP message class.
+ */
+public interface Messagable {
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Messagable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Queue.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Queue.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Queue.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Queue.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.Collection;
+
+/**
+ * Job Queue interface.
+ *  
+ * @param <T>
+ */
+public interface Queue<T>{
+
+  /**
+   * The queue name.
+   * @return the name of current queue.
+   */ 
+  String getName();
+
+  /**
+   * Add a job to a queue.
+   * @param job to be added to the queue.
+   */
+  void addJob(T job);
+
+  /**
+   * Remove a job from the queue.
+   * @param job to be removed from the queue.
+   */
+  void removeJob(T job);
+
+  /**
+   * Get a job
+   * @return job that is removed from the queue.
+   */
+  T removeJob();
+
+  /**
+   * Return all data stored in this queue.
+   * @return Collection of jobs.
+   */
+  public Collection<T> jobs();
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Queue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/QueueManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/QueueManager.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/QueueManager.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/QueueManager.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A BSPJob Queue Manager. 
+ */
+public class QueueManager{
+
+  private ConcurrentMap<String, Queue<JobInProgress>> queues = 
+    new ConcurrentHashMap<String, Queue<JobInProgress>>();
+
+  public QueueManager(Configuration conf){ }
+
+  /**
+   * Initialize a job.
+   * @param job required initialzied.
+   */
+  public void initJob(JobInProgress job){
+    try{
+      //job.updateStatus();
+      job.initTasks();
+    }catch(IOException ioe){
+      ioe.printStackTrace();
+    }
+  }
+
+  /**
+   * Add a job to the specified queue.
+   * @param name of the queue.
+   * @param job to be added.
+   */
+  public void addJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.addJob(job);
+  }
+
+  /**
+   * Remove a job from the head of a designated queue.
+   * @param name from which a job is removed.
+   * @param job to be removed from the queue.
+   */
+  public void removeJob(String name, JobInProgress job){
+    Queue<JobInProgress> queue = queues.get(name);
+    if(null != queue) queue.removeJob(job);
+  }
+
+  /**
+   * Move a job from a queue to another. 
+   * @param from a queue a job is to be removed.
+   * @param to a queue a job is to be added.
+   */
+  public void moveJob(String from, String to, JobInProgress job){
+    synchronized(queues){
+      removeJob(from, job);
+      addJob(to, job);
+    }  
+  }
+
+  /**
+   * Create a FCFS queue with the name provided.
+   * @param name of the queue. 
+   */
+  public void createFCFSQueue(String name){
+    queues.putIfAbsent(name, new FCFSQueue(name));
+  }
+
+  /**
+   * Find Queue according to the name specified.
+   * @param name of the queue. 
+   * @return queue of JobInProgress 
+   */
+  public Queue<JobInProgress> findQueue(String name){
+     return queues.get(name);
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/QueueManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hama.bsp.BSPMaster} to the
+ * {@link org.apache.hama.bsp.GroomServer} to reinitialize itself.
+ */
+class ReinitGroomAction extends GroomServerAction {
+
+  public ReinitGroomAction() {
+    super(ActionType.REINIT_GROOM);
+  }
+
+  public void write(DataOutput out) throws IOException {
+  }
+
+  public void readFields(DataInput in) throws IOException {
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReinitGroomAction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Reports status of GroomServer.
+ */
+public class ReportGroomStatusDirective extends Directive implements Writable {
+  public static final Log LOG = LogFactory.getLog(ReportGroomStatusDirective.class);
+
+  private GroomServerStatus status;
+
+  public ReportGroomStatusDirective(){ super(); }
+  
+  public ReportGroomStatusDirective(GroomServerStatus status) {
+    super(Directive.Type.Response);
+    this.status = status;
+  }
+
+  public GroomServerStatus getStatus() {
+    return this.status;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    this.status.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.status = new GroomServerStatus();
+    this.status.readFields(in);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/ReportGroomStatusDirective.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/RunningJob.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/RunningJob.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * <code>RunningJob</code> is the user-interface to query for details on a
+ * running BSP job.
+ * 
+ * <p>
+ * Clients can get hold of <code>RunningJob</code> via the {@link BSPJobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.
+ * </p>
+ * 
+ * @see BSPJobClient
+ */
+public interface RunningJob {
+  /**
+   * Get the job identifier.
+   * 
+   * @return the job identifier.
+   */
+  public BSPJobID getID();
+
+  /**
+   * Get the name of the job.
+   * 
+   * @return the name of the job.
+   */
+  public String getJobName();
+
+  /**
+   * Get the path of the submitted job configuration.
+   * 
+   * @return the path of the submitted job configuration.
+   */
+  public String getJobFile();
+
+  /**
+   * Get the <i>progress</i> of the job's tasks, as a float between 0.0 and 1.0.
+   * When all bsp tasks have completed, the function returns 1.0.
+   * 
+   * @return the progress of the job's tasks.
+   * @throws IOException
+   */
+  public long progress() throws IOException;
+
+  /**
+   * Check if the job is finished or not. This is a non-blocking call.
+   * 
+   * @return <code>true</code> if the job is complete, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isComplete() throws IOException;
+
+  /**
+   * Check if the job completed successfully.
+   * 
+   * @return <code>true</code> if the job succeeded, else <code>false</code>.
+   * @throws IOException
+   */
+  public boolean isSuccessful() throws IOException;
+
+  /**
+   * Blocks until the job is complete.
+   * 
+   * @throws IOException
+   */
+  public void waitForCompletion() throws IOException;
+
+  /**
+   * Returns the current state of the Job. {@link JobStatus}
+   * 
+   * @throws IOException
+   */
+  public int getJobState() throws IOException;
+
+  /**
+   * Kill the running job. Blocks until all job tasks have been killed as well.
+   * If the job is no longer running, it simply returns.
+   * 
+   * @throws IOException
+   */
+  public void killJob() throws IOException;
+
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to be terminated.
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   * @throws IOException
+   */
+  public void killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
+
+  public long getSuperstepCount() throws IOException;
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/RunningJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Schedulable.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Schedulable.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Schedulable.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Schedulable.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+/**
+ * This is the class that schedules commands to GroomServer(s)  
+ */
+public interface Schedulable{
+
+  /**
+   * Schedule job to designated GroomServer(s) immediately.
+   * @param job to be scheduled. 
+   * @param statuses of GroomServer(s).
+   * @throws IOException
+   */
+  void schedule(JobInProgress job, GroomServerStatus... statuses) 
+      throws IOException;
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Schedulable.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.ipc.GroomProtocol;
+
+/**
+ * A simple task scheduler. 
+ */
+class SimpleTaskScheduler extends TaskScheduler {
+
+  private static final Log LOG = LogFactory.getLog(SimpleTaskScheduler.class);
+
+  public static final String WAIT_QUEUE = "waitQueue";
+  public static final String PROCESSING_QUEUE = "processingQueue";
+  public static final String FINISHED_QUEUE = "finishedQueue";
+
+  private QueueManager queueManager;
+  private volatile boolean initialized;
+  private JobListener jobListener;
+  private JobProcessor jobProcessor;
+
+  private class JobListener extends JobInProgressListener {
+    @Override
+    public void jobAdded(JobInProgress job) throws IOException {
+      queueManager.initJob(job); // init task
+      queueManager.addJob(WAIT_QUEUE, job);
+    }
+
+    @Override
+    public void jobRemoved(JobInProgress job) throws IOException {
+      // queueManager.removeJob(WAIT_QUEUE, job);
+      queueManager.moveJob(PROCESSING_QUEUE, FINISHED_QUEUE, job);
+    }
+  }
+
+  private class JobProcessor extends Thread implements Schedulable {
+    JobProcessor() {
+      super("JobProcess");
+    }
+
+    /**
+     * Main logic scheduling task to GroomServer(s). Also, it will move
+     * JobInProgress from Wait Queue to Processing Queue.
+     */
+    public void run() {
+      if (false == initialized) {
+        throw new IllegalStateException("SimpleTaskScheduler initialization"
+            + " is not yet finished!");
+      }
+      while (initialized) {
+        Queue<JobInProgress> queue = queueManager.findQueue(WAIT_QUEUE);
+        if (null == queue) {
+          LOG.error(WAIT_QUEUE + " does not exist.");
+          throw new NullPointerException(WAIT_QUEUE + " does not exist.");
+        }
+        // move a job from the wait queue to the processing queue
+        JobInProgress j = queue.removeJob();
+        queueManager.addJob(PROCESSING_QUEUE, j);
+        // schedule
+        Collection<GroomServerStatus> glist = groomServerManager
+            .groomServerStatusKeySet();
+        schedule(j, (GroomServerStatus[]) glist
+            .toArray(new GroomServerStatus[glist.size()]));
+      }
+    }
+
+    /**
+     * Schedule job to designated GroomServer(s) immediately.
+     * 
+     * @param Targeted GroomServer(s).
+     * @param Job to be scheduled.
+     */
+    @Override
+    public void schedule(JobInProgress job, GroomServerStatus... statuses) {
+      ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false);
+      final int numGroomServers = clusterStatus.getGroomServers();
+      final ScheduledExecutorService sched = Executors
+          .newScheduledThreadPool(statuses.length + 5);
+      for (GroomServerStatus status : statuses) {
+        sched
+            .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS);
+      }// for
+    }
+  }
+
+  private class TaskWorker implements Runnable {
+    private final GroomServerStatus stus;
+    private final int groomNum;
+    private final JobInProgress jip;
+
+    TaskWorker(final GroomServerStatus stus, final int num,
+        final JobInProgress jip) {
+      this.stus = stus;
+      this.groomNum = num;
+      this.jip = jip;
+      if (null == this.stus)
+        throw new NullPointerException("Target groom server is not "
+            + "specified.");
+      if (-1 == this.groomNum)
+        throw new IllegalArgumentException("Groom number is not specified.");
+      if (null == this.jip)
+        throw new NullPointerException("No job is specified.");
+    }
+
+    public void run() {
+      // obtain tasks
+      List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+      Task t = null;
+      int cnt = 0;
+      while((t = jip.obtainNewTask(this.stus, groomNum) ) != null) {
+        actions.add(new LaunchTaskAction(t));
+        cnt++;
+
+        if(cnt > (this.stus.getMaxTasks() - 1))
+          break;
+      }
+      
+      // assembly into actions
+      // List<Task> tasks = new ArrayList<Task>();
+      if (jip.getStatus().getRunState() == JobStatus.RUNNING) {
+        GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
+        try {
+          // dispatch() to the groom server
+          Directive d1 = new DispatchTasksDirective(actions.toArray(new GroomServerAction[0]));
+          worker.dispatch(d1);
+        } catch (IOException ioe) {
+          LOG.error("Fail to dispatch tasks to GroomServer "
+              + this.stus.getGroomName(), ioe);
+        }
+      } else {
+        LOG.warn("Currently master only shcedules job in running state. "
+            + "This may be refined in the future. JobId:" + jip.getJobID());
+      }
+    }
+  }
+
+  public SimpleTaskScheduler() {
+    this.jobListener = new JobListener();
+    this.jobProcessor = new JobProcessor();
+  }
+
+  @Override
+  public void start() {
+    this.queueManager = new QueueManager(getConf()); // TODO: need factory?
+    this.queueManager.createFCFSQueue(WAIT_QUEUE);
+    this.queueManager.createFCFSQueue(PROCESSING_QUEUE);
+    this.queueManager.createFCFSQueue(FINISHED_QUEUE);
+    groomServerManager.addJobInProgressListener(this.jobListener);
+    this.initialized = true;
+    this.jobProcessor.start();
+  }
+
+  @Override
+  public void terminate() {
+    this.initialized = false;
+    if (null != this.jobListener)
+      groomServerManager.removeJobInProgressListener(this.jobListener);
+  }
+
+  @Override
+  public Collection<JobInProgress> getJobs(String queue) {
+    return (queueManager.findQueue(queue)).jobs();
+    // return jobQueue;
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Task.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Task.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Task.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.ipc.BSPPeerProtocol;
+
+/**
+ * Base class for tasks.
+ */
+public abstract class Task implements Writable {
+  public static final Log LOG = LogFactory.getLog(Task.class);
+  // //////////////////////////////////////////
+  // Fields
+  // //////////////////////////////////////////
+
+  protected BSPJobID jobId;
+  protected String jobFile;
+  protected TaskAttemptID taskId;
+  protected int partition;
+
+  protected LocalDirAllocator lDirAlloc;
+
+  public Task() {
+    jobId = new BSPJobID();
+    taskId = new TaskAttemptID();
+  }
+
+  public Task(BSPJobID jobId, String jobFile, TaskAttemptID taskId,
+      int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.taskId = taskId;
+    this.partition = partition;
+  }
+
+  // //////////////////////////////////////////
+  // Accessors
+  // //////////////////////////////////////////
+  public void setJobFile(String jobFile) {
+    this.jobFile = jobFile;
+  }
+
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  public TaskAttemptID getTaskAttemptId() {
+    return this.taskId;
+  }
+
+  public TaskAttemptID getTaskID() {
+    return taskId;
+  }
+
+  /**
+   * Get the job name for this task.
+   * 
+   * @return the job name
+   */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  /**
+   * Get the index of this task within the job.
+   * 
+   * @return the integer part of the task id
+   */
+  public int getPartition() {
+    return partition;
+  }
+
+  @Override
+  public String toString() {
+    return taskId.toString();
+  }
+
+  // //////////////////////////////////////////
+  // Writable
+  // //////////////////////////////////////////
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobId.write(out);
+    Text.writeString(out, jobFile);
+    taskId.write(out);
+    out.writeInt(partition);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobId.readFields(in);
+    jobFile = Text.readString(in);
+    taskId.readFields(in);
+    partition = in.readInt();
+  }
+
+  /**
+   * Run this task as a part of the named job. This method is executed in the
+   * child process.
+   * 
+   * @param bspPeer for communications
+   * @param umbilical for communications with GroomServer
+   */
+  public abstract void run(BSPJob job, BSPPeer bspPeer, BSPPeerProtocol umbilical)
+      throws IOException;
+
+  public abstract BSPTaskRunner createRunner(GroomServer groom);
+
+  public void done(BSPPeerProtocol umbilical) throws IOException {
+    umbilical.done(getTaskID(), true);
+  }
+  
+  public abstract BSPJob getConf();
+  public abstract void setConf(BSPJob localJobConf);
+  
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/Task.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The context for task attempts.
+ */
+public class TaskAttemptContext extends BSPJobContext implements Progressable {
+  private final TaskAttemptID taskId;
+  private String status = "";
+
+  public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+    super(conf, taskId.getJobID());
+    this.taskId = taskId;
+  }
+
+  /**
+   * Get the unique name for this task attempt.
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskId;
+  }
+
+  /**
+   * Set the current status of the task to the given string.
+   */
+  public void setStatus(String msg) throws IOException {
+    status = msg;
+  }
+
+  /**
+   * Get the last set status message.
+   * 
+   * @return the current status message
+   */
+  public String getStatus() {
+    return status;
+  }
+
+  /**
+   * Report progress. The subtypes actually do work in this method.
+   */
+  public void progress() {
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TaskAttemptID is a unique identifier for a task attempt.
+ */
+public class TaskAttemptID extends ID {
+  protected static final String ATTEMPT = "attempt";
+  private TaskID taskId;
+
+  public TaskAttemptID(TaskID taskId, int id) {
+    super(id);
+    if (taskId == null) {
+      throw new IllegalArgumentException("taskId cannot be null");
+    }
+    this.taskId = taskId;
+  }
+
+  public TaskAttemptID(String jtIdentifier, int jobId, int taskId, int id) {
+    this(new TaskID(jtIdentifier, jobId, taskId), id);
+  }
+
+  public TaskAttemptID() {
+    taskId = new TaskID();
+  }
+
+  public BSPJobID getJobID() {
+    return taskId.getJobID();
+  }
+
+  public TaskID getTaskID() {
+    return taskId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskAttemptID that = (TaskAttemptID) o;
+    return this.taskId.equals(that.taskId);
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return taskId.appendTo(builder).append(SEPARATOR).append(id);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    taskId.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    taskId.write(out);
+  }
+
+  @Override
+  public int hashCode() {
+    return taskId.hashCode() * 5 + id;
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskAttemptID that = (TaskAttemptID) o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if (tipComp == 0) {
+      return this.id - that.id;
+    } else
+      return tipComp;
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(ATTEMPT)).toString();
+  }
+
+  public static TaskAttemptID forName(String str)
+      throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split(Character.toString(SEPARATOR));
+      if (parts.length == 5) {
+        if (parts[0].equals(ATTEMPT)) {
+          return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
+              Integer.parseInt(parts[3]), Integer.parseInt(parts[4]));
+        }
+      }
+    } catch (Exception ex) {
+      // fall below
+    }
+    throw new IllegalArgumentException("TaskAttemptId string : " + str
+        + " is not properly formed");
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskAttemptID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskID.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskID.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskID.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskID.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+/**
+ * TaskID represents the immutable and unique identifier for a BSP Task.
+ */
+public class TaskID extends ID {
+  protected static final String TASK = "task";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
+  private BSPJobID jobId;
+
+  public TaskID(BSPJobID jobId, int id) {
+    super(id);
+    if (jobId == null) {
+      throw new IllegalArgumentException("jobId cannot be null");
+    }
+    this.jobId = jobId;
+  }
+
+  public TaskID(String jtIdentifier, int jobId, int id) {
+    this(new BSPJobID(jtIdentifier, jobId), id);
+  }
+
+  public TaskID() {
+    jobId = new BSPJobID();
+  }
+
+  /** Returns the {@link BSPJobID} object that this tip belongs to */
+  public BSPJobID getJobID() {
+    return jobId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TaskID that = (TaskID) o;
+    return this.jobId.equals(that.jobId);
+  }
+
+  @Override
+  public int compareTo(ID o) {
+    TaskID that = (TaskID) o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if (jobComp == 0) {
+      return this.id - that.id;
+    } else {
+      return jobComp;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(TASK)).toString();
+  }
+
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return jobId.appendTo(builder).append(SEPARATOR)
+        .append(idFormat.format(id));
+  }
+
+  @Override
+  public int hashCode() {
+    return jobId.hashCode() * 524287 + id;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    jobId.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+  }
+
+  public static TaskID forName(String str) throws IllegalArgumentException {
+    if (str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if (parts.length == 5) {
+        if (parts[0].equals(TASK)) {
+          return new TaskID(parts[1], Integer.parseInt(parts[2]), Integer
+              .parseInt(parts[4]));
+        }
+      }
+    } catch (Exception ex) {
+    }
+    throw new IllegalArgumentException("TaskId string : " + str
+        + " is not properly formed");
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
+
+/**
+ *TaskInProgress maintains all the info needed for a Task in the lifetime of
+ * its owning Job.
+ */
+class TaskInProgress {
+  public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
+
+  private Configuration conf;
+
+  // Constants
+  static final int MAX_TASK_EXECS = 1;
+  int maxTaskAttempts = 4;
+  private boolean failed = false;
+  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+
+  // Job Meta
+  private String jobFile = null;
+  private int partition;
+  private BSPMaster bspMaster;
+  private TaskID id;
+  private JobInProgress job;
+  private int completes = 0;
+
+  // Status
+  // private double progress = 0;
+  // private String state = "";
+  private long startTime = 0;
+
+  // The 'next' usable taskid of this tip
+  int nextTaskId = 0;
+
+  // The taskid that took this TIP to SUCCESS
+  private TaskAttemptID successfulTaskId;
+
+  // The first taskid of this tip
+  private TaskAttemptID firstTaskId;
+
+  // Map from task Id -> GroomServer Id, contains tasks that are
+  // currently runnings
+  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+  // All attempt Ids of this TIP
+  // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
+  /**
+   * Map from taskId -> TaskStatus
+   */
+  private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+
+  private BSPJobID jobId;
+
+  /**
+   * Constructor for new nexus between BSPMaster and GroomServer.
+   * 
+   * @param jobId is identification of JobInProgress.
+   * @param jobFile the path of job file
+   * @param partition which partition this TaskInProgress owns.
+   */
+  public TaskInProgress(BSPJobID jobId, String jobFile, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.partition = partition;
+
+    this.id = new TaskID(jobId, partition);
+  }
+
+  public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+      Configuration conf, JobInProgress job, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.setBspMaster(master);
+    this.job = job;
+    this.setConf(conf);
+    this.partition = partition;
+
+    this.id = new TaskID(jobId, partition);
+  }
+
+  /**
+   * Return a Task that can be sent to a GroomServer for execution.
+   */
+  public Task getTaskToRun(GroomServerStatus status) throws IOException {
+    Task t = null;
+
+    TaskAttemptID taskid = null;
+    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
+          + nextTaskId;
+      taskid = new TaskAttemptID(id, attemptId);
+      ++nextTaskId;
+    } else {
+      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
+          + " attempts for the tip '" + getTIPId() + "'");
+      return null;
+    }
+
+    t = new BSPTask(jobId, jobFile, taskid, partition);
+    activeTasks.put(taskid, status.getGroomName());
+
+    return t;
+  }
+
+  // //////////////////////////////////
+  // Accessors
+  // //////////////////////////////////
+  /**
+   * Return the start time
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Return the parent job
+   */
+  public JobInProgress getJob() {
+    return job;
+  }
+
+  public TaskID getTIPId() {
+    return id;
+  }
+
+  public TaskID getTaskId() {
+    return this.id;
+  }
+
+  public TreeMap<TaskAttemptID, String> getTasks() {
+    return activeTasks;
+  }
+
+  /**
+   * Is the Task associated with taskid is the first attempt of the tip?
+   * 
+   * @param taskId
+   * @return Returns true if the Task is the first attempt of the tip
+   */
+  public boolean isFirstAttempt(TaskAttemptID taskId) {
+    return firstTaskId == null ? false : firstTaskId.equals(taskId);
+  }
+
+  /**
+   * Is this tip currently running any tasks?
+   * 
+   * @return true if any tasks are running
+   */
+  public boolean isRunning() {
+    return !activeTasks.isEmpty();
+  }
+
+  /**
+   * Is this tip complete?
+   * 
+   * @return <code>true</code> if the tip is complete, else <code>false</code>
+   */
+  public synchronized boolean isComplete() {
+    return (completes > 0);
+  }
+
+  /**
+   * Is the given taskid the one that took this tip to completion?
+   * 
+   * @param taskid taskid of attempt to check for completion
+   * @return <code>true</code> if taskid is complete, else <code>false</code>
+   */
+  public boolean isComplete(TaskAttemptID taskid) {
+    return (completes > 0 && taskid.equals(getSuccessfulTaskid()));
+  }
+
+  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
+
+  public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
+    TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+    if ((ts != null) && (!tasksReportedClosed.contains(taskid))
+        && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+      tasksReportedClosed.add(taskid);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void completed(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
+
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.SUCCEEDED);
+    activeTasks.remove(taskid);
+
+    // Note the successful taskid
+    setSuccessfulTaskid(taskid);
+
+    //
+    // Now that the TIP is complete, the other speculative
+    // subtasks will be closed when the owning groom server
+    // reports in and calls shouldClose() on this object.
+    //
+
+    this.completes++;
+  }
+  
+  public void terminated(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
+
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.FAILED);
+    activeTasks.remove(taskid);
+  }
+
+  private void setSuccessfulTaskid(TaskAttemptID taskid) {
+    this.successfulTaskId = taskid;
+  }
+
+  private TaskAttemptID getSuccessfulTaskid() {
+    return successfulTaskId;
+  }
+
+  public void updateStatus(TaskStatus status) {
+    taskStatuses.put(status.getTaskId(), status);
+  }
+
+  public TaskStatus getTaskStatus(TaskAttemptID taskId) {
+    return this.taskStatuses.get(taskId);
+  }
+
+  public void kill() {
+    this.failed = true;
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
+  /**
+   * @param conf the conf to set
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * @return the conf
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * @param bspMaster the bspMaster to set
+   */
+  public void setBspMaster(BSPMaster bspMaster) {
+    this.bspMaster = bspMaster;
+  }
+
+  /**
+   * @return the bspMaster
+   */
+  public BSPMaster getBspMaster() {
+    return bspMaster;
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLog.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLog.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * A simple logger to handle the task-specific user logs.
+ */
+public class TaskLog {
+  private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
+
+  private static final File LOG_DIR = new File(
+      System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+
+  static {
+    if (!LOG_DIR.exists()) {
+      LOG_DIR.mkdirs();
+    }
+  }
+
+  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+  }
+
+  /**
+   * The filter for userlogs.
+   */
+  public static enum LogName {
+    /** Log on the stdout of the task. */
+    STDOUT("stdout"),
+
+    /** Log on the stderr of the task. */
+    STDERR("stderr"),
+
+    /** Log on the map-reduce system logs of the task. */
+    SYSLOG("syslog"),
+
+    /** The java profiler information. */
+    PROFILE("profile.out"),
+
+    /** Log the debug script's stdout */
+    DEBUGOUT("debugout");
+
+    private String prefix;
+
+    private LogName(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public String toString() {
+      return prefix;
+    }
+  }
+
+  private static class TaskLogsPurgeFilter implements FileFilter {
+    long purgeTimeStamp;
+
+    TaskLogsPurgeFilter(long purgeTimeStamp) {
+      this.purgeTimeStamp = purgeTimeStamp;
+    }
+
+    public boolean accept(File file) {
+      LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+          + file.lastModified() + ", purge: " + purgeTimeStamp);
+      return file.lastModified() < purgeTimeStamp;
+    }
+  }
+
+  /**
+   * Purge old user logs.
+   * 
+   * @throws IOException
+   */
+  public static synchronized void cleanup(int logsRetainHours)
+      throws IOException {
+    // Purge logs of tasks on this tasktracker if their
+    // mtime has exceeded "mapred.task.log.retain" hours
+    long purgeTimeStamp = System.currentTimeMillis()
+        - (logsRetainHours * 60L * 60 * 1000);
+    File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
+        purgeTimeStamp));
+    if (oldTaskLogs != null) {
+      for (int i = 0; i < oldTaskLogs.length; ++i) {
+        FileUtil.fullyDelete(oldTaskLogs[i]);
+      }
+    }
+  }
+
+  static class Reader extends InputStream {
+    private long bytesRemaining;
+    private FileInputStream file;
+
+    /**
+     * Read a log file from start to end positions. The offsets may be negative,
+     * in which case they are relative to the end of the file. For example,
+     * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
+     * -4197, -1) is the last 4196 bytes.
+     * 
+     * @param taskid the id of the task to read the log file for
+     * @param kind the kind of log to read
+     * @param start the offset to read from (negative is relative to tail)
+     * @param end the offset to read upto (negative is relative to tail)
+     * @throws IOException
+     */
+    public Reader(TaskAttemptID taskid, LogName kind, long start, long end)
+        throws IOException {
+      // find the right log file
+      File filename = getTaskLogFile(taskid, kind);
+      // calculate the start and stop
+      long size = filename.length();
+      if (start < 0) {
+        start += size + 1;
+      }
+      if (end < 0) {
+        end += size + 1;
+      }
+      start = Math.max(0, Math.min(start, size));
+      end = Math.max(0, Math.min(end, size));
+      bytesRemaining = end - start;
+      file = new FileInputStream(filename);
+      // skip upto start
+      long pos = 0;
+      while (pos < start) {
+        long result = file.skip(start - pos);
+        if (result < 0) {
+          bytesRemaining = 0;
+          break;
+        }
+        pos += result;
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = -1;
+      if (bytesRemaining > 0) {
+        bytesRemaining -= 1;
+        result = file.read();
+      }
+      return result;
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      length = (int) Math.min(length, bytesRemaining);
+      int bytes = file.read(buffer, offset, length);
+      if (bytes > 0) {
+        bytesRemaining -= bytes;
+      }
+      return bytes;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return (int) Math.min(bytesRemaining, file.available());
+    }
+
+    @Override
+    public void close() throws IOException {
+      file.close();
+    }
+  }
+
+  private static final String bashCommand = "bash";
+  private static final String tailCommand = "tail";
+
+  /**
+   * Get the desired maximum length of task's logs.
+   * 
+   * @param conf the job to look in
+   * @return the number of bytes to cap the log files at
+   */
+  public static long getTaskLogLength(HamaConfiguration conf) {
+    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. If the
+   * tailLength is 0, the entire output will be saved.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> cmd,
+      File stdoutFilename, File stderrFilename, long tailLength)
+      throws IOException {
+    return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
+        tailLength);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. Setup
+   * commands such as setting memory limit can be passed which will be executed
+   * before exec. If the tailLength is 0, the entire output will be saved.
+   * 
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+      List<String> cmd, File stdoutFilename, File stderrFilename,
+      long tailLength) throws IOException {
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    if (setup != null && setup.size() > 0) {
+      mergedCmd.append(addCommand(setup, false));
+      mergedCmd.append(";");
+    }
+    if (tailLength > 0) {
+      mergedCmd.append("(");
+    } else {
+      mergedCmd.append("exec ");
+    }
+    mergedCmd.append(addCommand(cmd, true));
+    mergedCmd.append(" < /dev/null ");
+    if (tailLength > 0) {
+      mergedCmd.append(" | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stderr);
+      mergedCmd.append(" ; exit $PIPESTATUS");
+    } else {
+      mergedCmd.append(" 1>> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" 2>> ");
+      mergedCmd.append(stderr);
+    }
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+  /**
+   * Add quotes to each of the command strings and return as a single string
+   * 
+   * @param cmd The command to be quoted
+   * @param isExecutable makes shell path if the first argument is executable
+   * @return returns The quoted string.
+   * @throws IOException
+   */
+  public static String addCommand(List<String> cmd, boolean isExecutable)
+      throws IOException {
+    StringBuffer command = new StringBuffer();
+    for (String s : cmd) {
+      command.append('\'');
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        command.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        command.append(s);
+      }
+      command.append('\'');
+      command.append(" ");
+    }
+    return command.toString();
+  }
+
+  /**
+   * Wrap a command in a shell to capture debug script's stdout and stderr to
+   * debugout.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param debugoutFilename The filename that stdout and stderr should be saved
+   *          to.
+   * @return the modified command that should be run
+   * @throws IOException
+   */
+  public static List<String> captureDebugOut(List<String> cmd,
+      File debugoutFilename) throws IOException {
+    String debugout = FileUtil.makeShellPath(debugoutFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for (String s : cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+} // TaskLog

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLog.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * A simple log4j-appender for the task child's BSP system logs.
+ */
+public class TaskLogAppender extends FileAppender {
+  private String taskId; // taskId should be managed as String rather than
+                         // TaskID object
+  // so that log4j can configure it from the configuration(log4j.properties).
+  private int maxEvents;
+  private Queue<LoggingEvent> tail = null;
+
+  @Override
+  public void activateOptions() {
+    synchronized (this) {
+      if (maxEvents > 0) {
+        tail = new LinkedList<LoggingEvent>();
+      }
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+          TaskLog.LogName.SYSLOG).toString());
+      setAppend(true);
+      super.activateOptions();
+    }
+  }
+
+  @Override
+  public void append(LoggingEvent event) {
+    synchronized (this) {
+      if (tail == null) {
+        super.append(event);
+      } else {
+        if (tail.size() >= maxEvents) {
+          tail.remove();
+        }
+        tail.add(event);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    if (tail != null) {
+      for (LoggingEvent event : tail) {
+        super.append(event);
+      }
+    }
+    super.close();
+  }
+
+  /**
+   * Getter/Setter methods for log4j.
+   */
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  private static final int EVENT_SIZE = 100;
+
+  public long getTotalLogFileSize() {
+    return maxEvents * EVENT_SIZE;
+  }
+
+  public void setTotalLogFileSize(long logSize) {
+    maxEvents = (int) logSize / EVENT_SIZE;
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message