incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1176297 [9/19] - in /incubator/hama/branches/HamaV2: ./ api/ api/target/ api/target/classes/ api/target/classes/META-INF/ api/target/lib/ api/target/maven-archiver/ api/target/maven-shared-archive-resources/ api/target/maven-shared-archive...
Date Tue, 27 Sep 2011 09:35:48 GMT
Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hama.checkpoint.CheckpointRunner;
+
+/**
+ * Base class that runs a task in a separate process.
+ */
+public class TaskRunner extends Thread {
+
+	public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+	private static final String SYSTEM_PATH_SEPARATOR = System
+			.getProperty("path.separator");
+
+	private enum LogType {
+		STDOUT, ERROR
+	}
+
+	boolean bspKilled = false;
+	private Process bspProcess;
+
+	private final Task task;
+	private final BSPJob conf;
+	private final GroomServer groomServer;
+
+	private File logDir;
+
+	class BspChildRunner implements Callable<Object> {
+		private final List<String> commands;
+		private final File workDir;
+		private final ScheduledExecutorService sched;
+		private final AtomicReference<ScheduledFuture<Object>> future;
+
+		BspChildRunner(List<String> commands, File workDir) {
+			this.commands = commands;
+			this.workDir = workDir;
+			this.sched = Executors.newScheduledThreadPool(1);
+			this.future = new AtomicReference<ScheduledFuture<Object>>();
+		}
+
+		void start() {
+			this.future.set(this.sched.schedule(this, 0, SECONDS));
+			LOG.info("Start building BSPPeer process.");
+		}
+
+		void stop() {
+			killBsp();
+			this.sched.schedule(this, 0, SECONDS);
+			LOG.info("Stop BSPPeer process.");
+		}
+
+		void join() throws InterruptedException, ExecutionException {
+			this.future.get().get();
+		}
+
+		public Object call() throws Exception {
+			ProcessBuilder builder = new ProcessBuilder(commands);
+			builder.directory(workDir);
+			try {
+				bspProcess = builder.start();
+				new Thread() {
+					public void run() {
+						logStream(bspProcess.getErrorStream(), LogType.ERROR);
+					}
+				}.start();
+
+				new Thread() {
+					public void run() {
+						logStream(bspProcess.getInputStream(), LogType.STDOUT);
+					}
+				}.start();
+
+				int exit_code = bspProcess.waitFor();
+				if (!bspKilled && exit_code != 0) {
+					throw new IOException(
+							"BSP task process exit with nonzero status of "
+									+ exit_code + ".");
+				}
+			} catch (InterruptedException e) {
+				LOG.warn(
+						"Thread is interrupted when execeuting Checkpointer process.",
+						e);
+			} catch (IOException ioe) {
+				LOG.error("Error when executing BSPPeer process.", ioe);
+			} finally {
+				killBsp();
+			}
+			return null;
+		}
+	}
+
+	public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+		this.task = bspTask;
+		this.conf = conf;
+		this.groomServer = groom;
+	}
+
+	public Task getTask() {
+		return task;
+	}
+
+	/**
+	 * Called to assemble this task's input. This method is run in the parent
+	 * process before the child is spawned. It should not execute user code,
+	 * only system code.
+	 */
+	public boolean prepare() throws IOException {
+		return true;
+	}
+
+	private File createWorkDirectory() {
+		File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+		boolean isCreated = workDir.mkdirs();
+		if (isCreated) {
+			LOG.debug("TaskRunner.workDir : " + workDir);
+		}
+		return workDir;
+	}
+
+	private String assembleClasspath(BSPJob jobConf, File workDir) {
+		StringBuffer classPath = new StringBuffer();
+		// start with same classpath as parent process
+		classPath.append(System.getProperty("java.class.path"));
+		classPath.append(SYSTEM_PATH_SEPARATOR);
+
+		String jar = jobConf.getJar();
+		if (jar != null) { // if jar exists, it into workDir
+			try {
+				RunJar.unJar(new File(jar), workDir);
+			} catch (IOException ioe) {
+				LOG.error(
+						"Unable to uncompressing file to " + workDir.toString(),
+						ioe);
+			}
+			File[] libs = new File(workDir, "lib").listFiles();
+			if (libs != null) {
+				for (int i = 0; i < libs.length; i++) {
+					// add libs from jar to classpath
+					classPath.append(SYSTEM_PATH_SEPARATOR);
+					classPath.append(libs[i]);
+				}
+			}
+			classPath.append(SYSTEM_PATH_SEPARATOR);
+			classPath.append(new File(workDir, "classes"));
+			classPath.append(SYSTEM_PATH_SEPARATOR);
+			classPath.append(workDir);
+		}
+		return classPath.toString();
+	}
+
+	private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
+			Class<?> child) {
+		// Build exec child jmv args.
+		List<String> vargs = new ArrayList<String>();
+		File jvm = // use same jvm as parent
+		new File(new File(System.getProperty("java.home"), "bin"), "java");
+		vargs.add(jvm.toString());
+
+		// bsp.child.java.opts
+		String javaOpts = jobConf.getConf().get("bsp.child.java.opts",
+				"-Xmx200m");
+		javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+
+		String[] javaOptsSplit = javaOpts.split(" ");
+		for (int i = 0; i < javaOptsSplit.length; i++) {
+			vargs.add(javaOptsSplit[i]);
+		}
+
+		// Add classpath.
+		vargs.add("-classpath");
+		vargs.add(classPath.toString());
+		// Add main class and its arguments
+		LOG.debug("Executing child Process " + child.getName());
+		vargs.add(child.getName()); // main of bsp or checkpointer Child
+
+		if (GroomServer.BSPPeerChild.class.equals(child)) {
+			InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+			vargs.add(addr.getHostName());
+			vargs.add(Integer.toString(addr.getPort()));
+			vargs.add(task.getTaskID().toString());
+			vargs.add(groomServer.groomHostName);
+		}
+
+		if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
+			String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
+					CheckpointRunner.DEFAULT_PORT);
+			LOG.debug("Checkpointer's port:" + ckptPort);
+			vargs.add(ckptPort);
+		}
+
+		return vargs;
+	}
+
+	/**
+	 * Build working environment and launch BSPPeer and Checkpointer processes.
+	 * And transmit data from BSPPeer's inputstream to Checkpointer's
+	 * OutputStream.
+	 */
+	public void run() {
+		File workDir = createWorkDirectory();
+		logDir = createLogDirectory();
+		String classPath = assembleClasspath(conf, workDir);
+		LOG.debug("Spawned child's classpath " + classPath);
+		List<String> bspArgs = buildJvmArgs(conf, classPath,
+				GroomServer.BSPPeerChild.class);
+
+		BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
+		bspPeer.start();
+		try {
+			bspPeer.join();
+		} catch (InterruptedException ie) {
+			LOG.error("BSPPeer child process is interrupted.", ie);
+		} catch (ExecutionException ee) {
+			LOG.error("Failure occurs when retrieving tasks result.", ee);
+		}
+		LOG.info("Finishes executing BSPPeer child process.");
+	}
+
+	/**
+	 * Creates the tasks log directory if needed.
+	 * 
+	 * @return the top directory of the tasks logging area.
+	 */
+	private File createLogDirectory() {
+		// our log dir looks following: log/tasklogs/job_id/
+		File f = new File(System.getProperty("hama.log.dir") + File.separator
+				+ "tasklogs" + File.separator + task.jobId.id);
+		// TODO if we have attemps: + File.separator+ task.getTaskID());
+
+		if (!f.exists()) {
+			f.mkdirs();
+		}
+
+		return f;
+	}
+
+	/**
+	 * Kill bsppeer child process.
+	 */
+	public void killBsp() {
+		if (bspProcess != null) {
+			bspProcess.destroy();
+		}
+		bspKilled = true;
+	}
+
+	/**
+	 * Log process's stream.
+	 * 
+	 * @param input
+	 *            stream to be logged.
+	 * @param stdout
+	 *            type of the log
+	 */
+	private void logStream(InputStream input, LogType type) {
+		// STDOUT file can be found under LOG_DIR/task_attempt_id.log
+		// ERROR file can be found under LOG_DIR/task_attempt_id.err
+		File taskLogFile = new File(logDir, task.getTaskAttemptId()
+				+ getFileEndingForType(type));
+		BufferedWriter writer = null;
+		try {
+			writer = new BufferedWriter(new FileWriter(taskLogFile));
+			BufferedReader in = new BufferedReader(new InputStreamReader(input));
+			String line;
+			while ((line = in.readLine()) != null) {
+				writer.write(line);
+				writer.newLine();
+			}
+		} catch (IOException e) {
+			LOG.warn(task.getTaskID() + " Error reading child output", e);
+		} finally {
+			try {
+				input.close();
+			} catch (IOException e) {
+				LOG.warn(task.getTaskID() + " Error closing child output", e);
+			}
+			try {
+				writer.close();
+			} catch (IOException e) {
+				LOG.warn(task.getTaskID() + " Error closing log file", e);
+			}
+		}
+	}
+
+	/**
+	 * Returns the ending of the logfile for each LogType. e.G. ".log".
+	 * 
+	 * @param type
+	 * @return an ending, including a dot.
+	 */
+	private String getFileEndingForType(LogType type) {
+		if (type != LogType.ERROR)
+			return ".err";
+		else
+			return ".log";
+	}
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
+ * s.
+ */
+abstract class TaskScheduler implements Configurable {
+
+  protected Configuration conf;
+  protected GroomServerManager groomServerManager;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setGroomServerManager(
+      GroomServerManager groomServerManager) {
+    this.groomServerManager = groomServerManager;
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to start any work in separate
+   * threads.
+   * 
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to stop any work it is doing.
+   * 
+   * @throws IOException
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+
+  // public abstract void addJob(JobInProgress job);
+
+  /**
+   * Returns a collection of jobs in an order which is specific to the
+   * particular scheduler.
+   * 
+   * @param Queue name.
+   * @return JobInProgress corresponded to the specified queue.
+   */
+  public abstract Collection<JobInProgress> getJobs(String queue);
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Describes the current status of a task. This is not intended to be a
+ * comprehensive piece of data.
+ */
+public class TaskStatus implements Writable, Cloneable {
+  static final Log LOG = LogFactory.getLog(TaskStatus.class);
+
+  // enumeration for reporting current phase of a task.
+  public static enum Phase {
+    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+  }
+
+  // what state is the task in?
+  public static enum State {
+    RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
+  }
+
+  private BSPJobID jobId;
+  private TaskAttemptID taskId;
+  private float progress;
+  private volatile State runState;
+  private String stateString;
+  private String groomServer;
+  private long superstepCount;
+
+  private long startTime;
+  private long finishTime;
+
+  private volatile Phase phase = Phase.STARTING;
+
+  /**
+   * 
+   */
+  public TaskStatus() {
+    jobId = new BSPJobID();
+    taskId = new TaskAttemptID();
+    this.superstepCount = 0;
+  }
+
+  public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
+      State runState, String stateString, String groomServer, Phase phase) {
+    this.jobId = jobId;
+    this.taskId = taskId;
+    this.progress = progress;
+    this.runState = runState;
+    this.stateString = stateString;
+    this.groomServer = groomServer;
+    this.phase = phase;
+    this.superstepCount = 0;
+  }
+
+  // //////////////////////////////////////////////////
+  // Accessors and Modifiers
+  // //////////////////////////////////////////////////
+
+  public BSPJobID getJobId() {
+    return jobId;
+  }
+
+  public TaskAttemptID getTaskId() {
+    return taskId;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setSuperstepCount(long superstepCount) {
+    this.superstepCount = superstepCount;  
+  }
+  
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public State getRunState() {
+    return runState;
+  }
+
+  public void setRunState(State state) {
+    this.runState = state;
+  }
+
+  public String getStateString() {
+    return stateString;
+  }
+
+  public void setStateString(String stateString) {
+    this.stateString = stateString;
+  }
+
+  public String getGroomServer() {
+    return groomServer;
+  }
+
+  public void setGroomServer(String groomServer) {
+    this.groomServer = groomServer;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Get start time of the task.
+   * 
+   * @return 0 is start time is not set, else returns start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Set startTime of the task.
+   * 
+   * @param startTime start time
+   */
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  /**
+   * Get current phase of this task.
+   * 
+   * @return .
+   */
+  public Phase getPhase() {
+    return this.phase;
+  }
+
+  /**
+   * Set current phase of this task.
+   * 
+   * @param phase phase of this task
+   */
+  void setPhase(Phase phase) {
+    this.phase = phase;
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * This update is done by ping thread before sending the status.
+   * 
+   * @param progress
+   * @param state
+   * @param counters
+   */
+  synchronized void statusUpdate(float progress, String state) {
+    setProgress(progress);
+    setStateString(state);
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * @param status updated status
+   */
+  synchronized void statusUpdate(TaskStatus status) {
+    this.superstepCount = status.getSuperstepCount();
+    this.progress = status.getProgress();
+    this.runState = status.getRunState();
+    this.stateString = status.getStateString();
+
+    if (status.getStartTime() != 0) {
+      this.startTime = status.getStartTime();
+    }
+    if (status.getFinishTime() != 0) {
+      this.finishTime = status.getFinishTime();
+    }
+
+    this.phase = status.getPhase();
+  }
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in BSPMaster when a cleanup attempt of task reports its
+   * status. Then update only specific fields, not all.
+   * 
+   * @param superstepCount
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(long superstepCount, State runState, float progress, String state,
+      Phase phase, long finishTime) {
+    setSuperstepCount(superstepCount);
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime;
+    }
+  }
+
+  /**
+   * @return The number of BSP super steps executed by the task.
+   */
+  public long getSuperstepCount() {
+    return superstepCount;
+  }
+
+  /**
+   * Increments the number of BSP super steps executed by the task.
+   */
+  public void incrementSuperstepCount() {
+    superstepCount += 1;
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  // ////////////////////////////////////////////
+  // Writable
+  // ////////////////////////////////////////////
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.jobId.readFields(in);
+    this.taskId.readFields(in);
+    this.progress = in.readFloat();
+    this.runState = WritableUtils.readEnum(in, State.class);
+    this.stateString = Text.readString(in);
+    this.phase = WritableUtils.readEnum(in, Phase.class);
+    this.startTime = in.readLong();
+    this.finishTime = in.readLong();
+    this.superstepCount = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobId.write(out);
+    taskId.write(out);
+    out.writeFloat(progress);
+    WritableUtils.writeEnum(out, runState);
+    Text.writeString(out, stateString);
+    WritableUtils.writeEnum(out, phase);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    out.writeLong(superstepCount);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/package.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/package.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/package.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+BSP computing framework.
+</body>
+</html>

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/bsp/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServer.CheckpointerChild;
+import org.apache.hama.bsp.GroomServer;
+import static java.util.concurrent.TimeUnit.*;
+
+
+public final class CheckpointRunner implements Callable {
+
+  public static final Log LOG = LogFactory.getLog(CheckpointRunner.class);
+  public static final String DEFAULT_PORT = "1590";
+  
+  private final List<String> commands;
+  private final ScheduledExecutorService sched;
+  private final AtomicReference<Process> process;
+  private final AtomicBoolean isAlive = new AtomicBoolean(false);
+
+  public CheckpointRunner(List<String> commands) {
+    this.commands = commands;
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Command for executing Checkpoint runner:"+
+      Arrays.toString(this.commands.toArray()));
+    }
+    this.sched = Executors.newScheduledThreadPool(10);
+    this.process = new AtomicReference<Process>();
+  }
+
+  public static final List<String> buildCommands(final Configuration config) {
+    List<String> vargs = new ArrayList<String>();
+    File jvm =
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    vargs.add(jvm.toString());
+
+    String javaOpts = config.get("bsp.checkpoint.child.java.opts", "-Xmx50m");
+    String[] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+    vargs.add("-classpath");
+    vargs.add(System.getProperty("java.class.path"));
+    vargs.add(CheckpointerChild.class.getName());
+    String port = config.get("bsp.checkpoint.port", DEFAULT_PORT);
+    if(LOG.isDebugEnabled())
+      LOG.debug("Checkpointer's port:"+port);
+    vargs.add(port);
+
+    return vargs;
+  }
+
+  public void start() {
+    if(!isAlive.compareAndSet(false, true)) {
+      throw new IllegalStateException(this.getClass().getName()+
+      " is already running.");
+    }
+    this.sched.schedule(this, 0, SECONDS);
+    LOG.info("Start building checkpointer process.");
+  }
+
+  public void stop() {
+    kill();
+    this.sched.shutdown();
+    LOG.info("Stop checkpointer process.");
+  }
+
+  public Process getProcess() {
+    return this.process.get();
+  }
+
+  public void kill() {
+    if (this.process.get() != null) {
+      this.process.get().destroy();
+    }
+    isAlive.set(false);
+  }
+
+  public boolean isAlive() {
+    return isAlive.get();
+  }
+
+  public Object call() throws Exception {
+    ProcessBuilder builder = new ProcessBuilder(commands);
+    try{
+      this.process.set(builder.start());
+      new Thread() {
+        @Override
+        public void run() {
+          logStream(process.get().getErrorStream());
+        }
+      }.start();
+
+      new Thread() {
+        @Override
+        public void run() {
+          logStream(process.get().getInputStream());
+        }
+      }.start();
+
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
+        public void run() {
+          LOG.info("Destroying checkpointer process.");
+          getProcess().destroy();
+        }
+      });
+
+      int exit_code = this.process.get().waitFor();
+      if (!isAlive() && exit_code != 0) {
+        throw new IOException("Checkpointer process exit with nonzero status of "
+            + exit_code + ".");
+      }
+    } catch(InterruptedException e){
+      LOG.warn("Thread is interrupted when execeuting Checkpointer process.", e);
+    } catch(IOException ioe) {
+      LOG.error("Error when executing Checkpointer process.", ioe);
+    } finally {
+      kill();
+    }
+    return null;
+  }
+
+  private void logStream(InputStream output) {
+    try {
+      BufferedReader in = new BufferedReader(new InputStreamReader(output));
+      String line;
+      while ((line = in.readLine()) != null) {
+        LOG.info(line);
+      }
+    } catch (IOException e) {
+      LOG.warn("Error reading checkpoint process's inputstream.", e);
+    } finally {
+      try {
+        output.close();
+      } catch (IOException e) {
+        LOG.warn("Error closing checkpoint's inputstream.", e);
+      }
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.GroomServerRunner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class is responsible for checkpointing messages to hdfs. 
+ */
+public final class Checkpointer implements Callable {
+
+  public static Log LOG = LogFactory.getLog(Checkpointer.class);
+
+  private final ScheduledExecutorService scheduler = 
+    Executors.newScheduledThreadPool(1);
+  private final FileSystem dfs; 
+  private final AtomicBoolean ckptState = new AtomicBoolean(false);
+  private final BSPMessageDeserializer messageDeserializer;
+  private final AtomicReference<ScheduledFuture> future =  
+    new AtomicReference<ScheduledFuture>(); 
+
+  /** 
+   * Reading from socket inputstream as DataInput.
+   */
+  public static final class BSPMessageDeserializer implements Callable {
+    final BlockingQueue<BSPSerializableMessage> messageQueue = 
+      new LinkedBlockingQueue<BSPSerializableMessage>();
+    final ScheduledExecutorService sched; 
+    final ScheduledExecutorService workers; 
+    final AtomicBoolean serializerState = new AtomicBoolean(false);
+    final ServerSocket server;
+    
+    public BSPMessageDeserializer(final int port) throws IOException { 
+      this.sched = Executors.newScheduledThreadPool(1);
+      this.workers = Executors.newScheduledThreadPool(10);
+      this.server = new ServerSocket(port); 
+      LOG.info("Deserializer's port is opened at "+port);
+    }
+
+    public int port() {
+      return this.server.getLocalPort(); 
+    }
+
+    public void start() {
+      if(!serializerState.compareAndSet(false, true)) {
+        throw new IllegalStateException("BSPMessageDeserializer has been "+
+        "started up.");
+      }
+      this.sched.schedule(this, 0, SECONDS);
+      LOG.info("BSPMessageDeserializer is started.");
+    }
+
+    public void stop() {
+      try {
+        this.server.close();
+      } catch(IOException ioe) {
+        LOG.error("Unable to close message serializer server socket.", ioe);
+      }
+      this.sched.shutdown();
+      this.workers.shutdown();
+      this.serializerState.set(false);
+      LOG.info("BSPMessageDeserializer is stopped.");
+    }
+
+    public boolean state(){
+      return this.serializerState.get();
+    }
+
+    /**
+     * Message is enqueued for communcating data sent from BSPPeer.
+     */
+    public BlockingQueue<BSPSerializableMessage> messageQueue() {
+      return this.messageQueue;
+    }
+
+    public Object call() throws Exception {
+      try {
+        while(state()) {
+          Socket connection = server.accept();
+          final DataInput in = new DataInputStream(connection.getInputStream());
+          this.workers.schedule(new Callable() {
+            public Object call() throws Exception {
+              BSPSerializableMessage tmp = new BSPSerializableMessage();
+              tmp.readFields(in);
+              messageQueue().put(tmp);
+              return null;
+            }
+          }, 0, SECONDS);
+        }
+      } catch(EOFException eofe) {
+        LOG.info("Closing checkpointer's input stream.", eofe);
+      } catch(IOException ioe) {
+        LOG.error("Error when reconstructing BSPSerializableMessage.", ioe);
+      }
+      return null;
+    }
+  }
+
+  public Checkpointer(final Configuration conf) throws IOException {
+    this.dfs = FileSystem.get(conf); 
+    if(null == this.dfs) 
+      throw new NullPointerException("HDFS instance not found.");
+    int port = conf.getInt("bsp.checkpoint.port", 
+      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+    if(LOG.isDebugEnabled()) 
+      LOG.debug("Checkpoint port value:"+port); 
+    this.messageDeserializer = new BSPMessageDeserializer(port);
+  }
+
+  /**
+   * Activate the checkpoint thread.
+   */
+  public void start(){
+    if(!ckptState.compareAndSet(false, true)) {
+      throw new IllegalStateException("Checkpointer has been started up.");
+    }
+    this.messageDeserializer.start();
+    this.future.set(this.scheduler.schedule(this, 0, SECONDS));
+    LOG.info("Checkpointer is started.");
+  }
+
+  /**
+   * Stop checkpoint thread.
+   */
+  public void stop(){
+    this.messageDeserializer.stop();
+    this.scheduler.shutdown();
+    this.ckptState.set(false);
+    LOG.info("Checkpointer is stopped.");
+  }
+  
+  /**
+   * Check if checkpointer is running.
+   * @return true if checkpointer is runing; false otherwise.
+   */
+  public boolean isAlive(){
+    return !this.scheduler.isShutdown() && this.ckptState.get();
+  }
+
+  public void join() throws InterruptedException, ExecutionException {
+    this.future.get().get();
+  }
+
+  public Boolean call() throws Exception {
+    BlockingQueue<BSPSerializableMessage> queue = 
+      this.messageDeserializer.messageQueue();
+    while(isAlive()) {
+      BSPSerializableMessage msg = queue.take();
+      String path = msg.checkpointedPath();
+      if(null == path || path.toString().isEmpty()) 
+        throw new NullPointerException("File dest is not provided.");
+      FSDataOutputStream out = this.dfs.create(new Path(path)); 
+      msg.messageBundle().write(out);
+      try { } finally { try { out.close(); } catch(IOException e) {
+        LOG.error("Fail to close hdfs output stream.", e); } 
+      } 
+    }
+    try {  } finally { LOG.info("Stop checkpointing."); this.stop(); }
+    return new Boolean(true);
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/HttpServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/HttpServer.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/HttpServer.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/HttpServer.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.log.LogLevel;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.servlet.FilterHolder;
+import org.mortbay.jetty.servlet.FilterMapping;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+/**
+ * Create a Jetty embedded server to answer http requests. The primary goal is
+ * to serve up status information for the server. There are three contexts:
+ * "/logs/" -> points to the log directory "/static/" -> points to common static
+ * files (src/main/webapp/static) "/" -> the jsp server code from
+ * (src/main/webapp/<name>)
+ */
+public class HttpServer {
+  public static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+  static final String FILTER_INITIALIZER_PROPERTY = "hama.http.filter.initializers";
+
+  protected final Server webServer;
+  protected final Connector listener;
+  protected final WebAppContext webAppContext;
+  protected final boolean findPort;
+  protected final Map<Context, Boolean> defaultContexts = new HashMap<Context, Boolean>();
+  protected final List<String> filterNames = new ArrayList<String>();
+  private static final int MAX_RETRIES = 10;
+
+  /** Same as this(name, bindAddress, port, findPort, null); */
+  public HttpServer(String name, String bindAddress, int port, boolean findPort)
+      throws IOException {
+    this(name, bindAddress, port, findPort, new Configuration());
+  }
+
+  /**
+   * Create a status server on the given port. The jsp scripts are taken from
+   * src/main/webapp/<name>.
+   * 
+   * @param name The name of the server
+   * @param port The port to use on the server
+   * @param findPort whether the server should start at the given port and
+   *          increment by 1 until it finds a free port.
+   * @param conf Configuration
+   */
+  public HttpServer(String name, String bindAddress, int port,
+      boolean findPort, Configuration conf) throws IOException {
+    webServer = new Server();
+    this.findPort = findPort;
+
+    listener = createBaseListener(conf);
+    listener.setHost(bindAddress);
+    listener.setPort(port);
+    webServer.addConnector(listener);
+
+    webServer.setThreadPool(new QueuedThreadPool());
+
+    final String appDir = getWebAppsPath();
+    ContextHandlerCollection contexts = new ContextHandlerCollection();
+    webServer.setHandler(contexts);
+    webAppContext = new WebAppContext();
+
+    System.setProperty("java.naming.factory.initial",
+        "org.mortbay.naming.InitialContextFactory");
+    System.setProperty("java.naming.factory.url.pkgs", "org.mortbay.naming");
+
+    webAppContext.setContextPath("/");
+    webAppContext.setWar(appDir + "/" + name);
+    webServer.addHandler(webAppContext);
+
+    addDefaultApps(contexts, appDir);
+
+    addDefaultServlets();
+  }
+
+  /**
+   * Create a required listener for the Jetty instance listening on the port
+   * provided. This wrapper and all subclasses must create at least one
+   * listener.
+   */
+  protected Connector createBaseListener(Configuration conf) throws IOException {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setLowResourceMaxIdleTime(10000);
+    ret.setAcceptQueueSize(128);
+    ret.setResolveNames(false);
+    ret.setUseDirectBuffers(false);
+    return ret;
+  }
+
+  /**
+   * Add default apps.
+   * 
+   * @param appDir The application directory
+   * @throws IOException
+   */
+  protected void addDefaultApps(ContextHandlerCollection parent,
+      final String appDir) throws IOException {
+    // set up the context for "/logs/" if "hama.log.dir" property is defined.
+    String logDir = System.getProperty("hama.log.dir");
+    if (logDir != null) {
+      Context logContext = new Context(parent, "/logs");
+      logContext.setResourceBase(logDir);
+      logContext.addServlet(DefaultServlet.class, "/");
+      defaultContexts.put(logContext, true);
+    }
+    // set up the context for "/static/*"
+    Context staticContext = new Context(parent, "/static");
+    staticContext.setResourceBase(appDir + "/static");
+    staticContext.addServlet(DefaultServlet.class, "/*");
+    defaultContexts.put(staticContext, true);
+  }
+
+  /**
+   * Add default servlets.
+   */
+  protected void addDefaultServlets() {
+    // set up default servlets
+    addServlet("stacks", "/stacks", StackServlet.class);
+    addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
+  }
+
+  public void addContext(Context ctxt, boolean isFiltered) throws IOException {
+    webServer.addHandler(ctxt);
+    defaultContexts.put(ctxt, isFiltered);
+  }
+
+  /**
+   * Add a context
+   * 
+   * @param pathSpec The path spec for the context
+   * @param dir The directory containing the context
+   * @param isFiltered if true, the servlet is added to the filter path mapping
+   * @throws IOException
+   */
+  protected void addContext(String pathSpec, String dir, boolean isFiltered)
+      throws IOException {
+    if (0 == webServer.getHandlers().length) {
+      throw new RuntimeException("Couldn't find handler");
+    }
+    WebAppContext webAppCtx = new WebAppContext();
+    webAppCtx.setContextPath(pathSpec);
+    webAppCtx.setWar(dir);
+    addContext(webAppCtx, true);
+  }
+
+  /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * 
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name, value);
+  }
+
+  /**
+   * Add a servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    addInternalServlet(name, pathSpec, clazz);
+    addFilterPathMapping(pathSpec, webAppContext);
+  }
+
+  /**
+   * Add an internal servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   * @deprecated this is a temporary method
+   */
+  @Deprecated
+  public void addInternalServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    ServletHolder holder = new ServletHolder(clazz);
+    if (name != null) {
+      holder.setName(name);
+    }
+    webAppContext.addServlet(holder, pathSpec);
+  }
+
+  /**
+   * Define a filter for a context and set up default url mappings.
+   */
+  protected void defineFilter(Context ctx, String name, String classname,
+      Map<String, String> parameters, String[] urls) {
+
+    FilterHolder holder = new FilterHolder();
+    holder.setName(name);
+    holder.setClassName(classname);
+    holder.setInitParameters(parameters);
+    FilterMapping fmap = new FilterMapping();
+    fmap.setPathSpecs(urls);
+    fmap.setDispatches(Handler.ALL);
+    fmap.setFilterName(name);
+    ServletHandler handler = ctx.getServletHandler();
+    handler.addFilter(holder, fmap);
+  }
+
+  /**
+   * Add the path spec to the filter path mapping.
+   * 
+   * @param pathSpec The path spec
+   * @param webAppCtx The WebApplicationContext to add to
+   */
+  protected void addFilterPathMapping(String pathSpec, Context webAppCtx) {
+    ServletHandler handler = webAppCtx.getServletHandler();
+    for (String name : filterNames) {
+      FilterMapping fmap = new FilterMapping();
+      fmap.setPathSpec(pathSpec);
+      fmap.setFilterName(name);
+      fmap.setDispatches(Handler.ALL);
+      handler.addFilterMapping(fmap);
+    }
+  }
+
+  /**
+   * Get the value in the webapp context.
+   * 
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }
+
+  /**
+   * Get the pathname to the webapps files.
+   * 
+   * @return the pathname as a URL
+   * @throws IOException if 'webapps' directory cannot be found on CLASSPATH.
+   */
+  protected String getWebAppsPath() throws IOException {
+    // URL url = BSPMaster.class.getClassLoader().getResource("webapps");
+    // if (url == null)
+    // throw new IOException("webapps not found in CLASSPATH");
+    // return url.toString();
+    return "src/main/webapp";
+  }
+
+  /**
+   * Get the port that the server is on
+   * 
+   * @return the port
+   */
+  public int getPort() {
+    return webServer.getConnectors()[0].getLocalPort();
+  }
+
+  /**
+   * Set the min, max number of worker threads (simultaneous connections).
+   */
+  public void setThreads(int min, int max) {
+    QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool();
+    pool.setMinThreads(min);
+    pool.setMaxThreads(max);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param keystore location of the keystore
+   * @param storPass password for the keystore
+   * @param keyPass password for the key
+   * @deprecated Use
+   *             {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
+   */
+  @Deprecated
+  public void addSslListener(InetSocketAddress addr, String keystore,
+      String storPass, String keyPass) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(keystore);
+    sslListener.setPassword(storPass);
+    sslListener.setKeyPassword(keyPass);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param sslConf conf to retrieve ssl options
+   * @param needClientAuth whether client authentication is required
+   */
+  public void addSslListener(InetSocketAddress addr, Configuration sslConf,
+      boolean needClientAuth) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    if (needClientAuth) {
+      // setting up SSL truststore for authenticating clients
+      System.setProperty("javax.net.ssl.trustStore", sslConf.get(
+          "ssl.server.truststore.location", ""));
+      System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
+          "ssl.server.truststore.password", ""));
+      System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
+          "ssl.server.truststore.type", "jks"));
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
+    sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
+    sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword",
+        ""));
+    sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
+    sslListener.setNeedClientAuth(needClientAuth);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Start the server. Does not wait for the server to start.
+   */
+  public void start() throws IOException {
+    try {
+      int port = 0;
+      int oriPort = listener.getPort(); // The original requested port
+      while (true) {
+        try {
+          port = webServer.getConnectors()[0].getLocalPort();
+          LOG.info("Port returned by webServer.getConnectors()[0]."
+              + "getLocalPort() before open() is " + port
+              + ". Opening the listener on " + oriPort);
+          listener.open();
+          port = listener.getLocalPort();
+          LOG.info("listener.getLocalPort() returned "
+              + listener.getLocalPort()
+              + " webServer.getConnectors()[0].getLocalPort() returned "
+              + webServer.getConnectors()[0].getLocalPort());
+          // Workaround to handle the problem reported in HADOOP-4744
+          if (port < 0) {
+            Thread.sleep(100);
+            int numRetries = 1;
+            while (port < 0) {
+              LOG.warn("listener.getLocalPort returned " + port);
+              if (numRetries++ > MAX_RETRIES) {
+                throw new Exception(" listener.getLocalPort is returning "
+                    + "less than 0 even after " + numRetries + " resets");
+              }
+              for (int i = 0; i < 2; i++) {
+                LOG.info("Retrying listener.getLocalPort()");
+                port = listener.getLocalPort();
+                if (port > 0) {
+                  break;
+                }
+                Thread.sleep(200);
+              }
+              if (port > 0) {
+                break;
+              }
+              LOG.info("Bouncing the listener");
+              listener.close();
+              Thread.sleep(1000);
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              port = listener.getLocalPort();
+            }
+          } // Workaround end
+          LOG.info("Jetty bound to port " + port);
+          webServer.start();
+          // Workaround for HADOOP-6386
+          port = listener.getLocalPort();
+          if (port < 0) {
+            LOG.warn("Bounds port is " + port + " after webserver start");
+            for (int i = 0; i < MAX_RETRIES / 2; i++) {
+              try {
+                webServer.stop();
+              } catch (Exception e) {
+                LOG.warn("Can't stop  web-server", e);
+              }
+              Thread.sleep(1000);
+
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              webServer.start();
+              LOG.info(i + "attempts to restart webserver");
+              port = listener.getLocalPort();
+              if (port > 0)
+                break;
+            }
+            if (port < 0)
+              throw new Exception("listener.getLocalPort() is returning "
+                  + "less than 0 even after " + MAX_RETRIES + " resets");
+          }
+          // End of HADOOP-6386 workaround
+          break;
+        } catch (IOException ex) {
+          // if this is a bind exception,
+          // then try the next port number.
+          if (ex instanceof BindException) {
+            if (!findPort) {
+              throw (BindException) ex;
+            }
+          } else {
+            LOG.info("HttpServer.start() threw a non Bind IOException");
+            throw ex;
+          }
+        } catch (MultiException ex) {
+          LOG.info("HttpServer.start() threw a MultiException");
+          throw ex;
+        }
+        listener.setPort((oriPort += 1));
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Problem starting http server", e);
+    }
+  }
+
+  /**
+   * stop the server
+   */
+  public void stop() throws Exception {
+    listener.close();
+    webServer.stop();
+  }
+
+  public void join() throws InterruptedException {
+    webServer.join();
+  }
+
+  /**
+   * A very simple servlet to serve up a text representation of the current
+   * stack traces. It both returns the stacks to the caller and logs them.
+   * Currently the stack traces are done sequentially rather than exactly the
+   * same data.
+   */
+  public static class StackServlet extends HttpServlet {
+    private static final long serialVersionUID = -6284183679759467039L;
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException, IOException {
+
+      PrintWriter out = new PrintWriter(response.getOutputStream());
+      ReflectionUtils.printThreadInfo(out, "");
+      out.close();
+      ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
+    }
+  }
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/HttpServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/package.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/package.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/package.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Contains the administrative web interfaces. 
+</body>
+</html>

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/http/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskStatus;
+
+/**
+ * Protocol that task child process uses to contact its parent process.
+ */
+public interface BSPPeerProtocol extends HamaRPCProtocolVersion, Closeable,
+    Constants {
+
+  /** Called when a child task process starts, to get its task. */
+  Task getTask(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Periodically called by child to check if parent is still alive.
+   * 
+   * @return True if the task is known
+   */
+  boolean ping(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Report that the task is successfully completed. Failure is assumed if the
+   * task process exits without calling this.
+   * 
+   * @param taskid task's id
+   * @param shouldBePromoted whether to promote the task's output or not
+   */
+  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+
+  /** Report that the task encounted a local filesystem error. */
+  void fsError(TaskAttemptID taskId, String message) throws IOException;
+
+  /** Report that the task encounted a fatal error. */
+  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  
+  /**
+   * Report child's progress to parent.
+   * 
+   * @param taskId task-id of the child
+   * @param taskStatus status of the child
+   * @throws IOException
+   * @throws InterruptedException
+   * @return True if the task is known
+   */
+  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException;
+
+  /**
+   * @param taskid
+   * @return assigned port number
+   */
+  int getAssignedPortNum(TaskAttemptID taskid);
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A protocol for BSPMaster talks to GroomServer. 
+ */
+public interface GroomProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Instruct GroomServer performaning tasks.
+   * 
+   * @param directive instructs a GroomServer performing necessary
+   *        execution.
+   * @throws IOException
+   */
+  void dispatch(Directive directive) throws IOException;
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * There is one version id for all the RPC interfaces. If any interface is
+ * changed, the versionID must be changed here.
+ */
+public interface HamaRPCProtocolVersion extends VersionedProtocol {
+  public static final long versionID = 1L;
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.JobProfile;
+import org.apache.hama.bsp.JobStatus;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that a groom server and the central BSP Master use to communicate.
+ * This interface will contains several methods: submitJob, killJob, and
+ * killTask.
+ */
+public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Allocate a new id for the job.
+   * 
+   * @return job id
+   * @throws IOException
+   */
+  public BSPJobID getNewJobId() throws IOException;
+
+  /**
+   * Submit a Job for execution. Returns the latest profile for that job. The
+   * job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   * 
+   * @param jobID
+   * @param jobFile
+   * @return jobStatus
+   * @throws IOException
+   */
+  // public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
+
+  /**
+   * Get the current status of the cluster
+   * 
+   * @param detailed if true then report groom names as well
+   * @return summary of the state of the cluster
+   */
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * 
+   * @return Profile of the job, or null if not found.
+   */
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
+
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * 
+   * @return Status of the job, or null if not found.
+   */
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
+
+  /**
+   * A BSP system always operates on a single filesystem. This function returns
+   * the fs name. ('local' if the localfs; 'addr:port' if dfs). The client can
+   * then copy files into the right locations prior to submitting the job.
+   */
+  public String getFilesystemName() throws IOException;
+
+  /**
+   * Get the jobs that are not completed and not failed
+   * 
+   * @return array of JobStatus for the running/to-be-run jobs.
+   */
+  public JobStatus[] jobsToComplete() throws IOException;
+
+  /**
+   * Get all the jobs submitted.
+   * 
+   * @return array of JobStatus for the submitted jobs
+   */
+  public JobStatus[] getAllJobs() throws IOException;
+
+  /**
+   * Grab the bspmaster system directory path where job-specific files are to be
+   * placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir();
+
+  /**
+   * Kill the indicated job
+   */
+  public void killJob(BSPJobID jobid) throws IOException;
+
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to kill.
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   */
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+import org.apache.hama.bsp.GroomServerStatus;
+
+/**
+ * A new protocol for GroomServers communicate with BSPMaster. This
+ * protocol paired with WorkerProtocl, let GroomServers enrol with 
+ * BSPMaster, so that BSPMaster can dispatch tasks to GroomServers.
+ */
+public interface MasterProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * A GroomServer register with its status to BSPMaster, which will update
+   * GroomServers cache.
+   *
+   * @param status to be updated in cache.
+   * @return true if successfully register with BSPMaster; false if fail.
+   */
+  boolean register(GroomServerStatus status) throws IOException;
+
+  /**
+   * A GroomServer (periodically) reports task statuses back to the BSPMaster.
+   * @param directive 
+   */
+  boolean report(Directive directive) throws IOException;
+
+  public String getSystemDir();
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/package.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/package.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/package.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Tools to help define network clients and servers. 
+</body>
+</html>

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/ipc/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/package.html?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/package.html (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/package.html Tue Sep 27 09:35:21 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Hama base package. 
+</body>
+</html>

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1176297&view=auto
==============================================================================
--- incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (added)
+++ incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Tue Sep 27 09:35:21 2011
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.JobStatus;
+
+public class BSPServletUtil extends ServletUtil {
+
+  public static final String HTML_TAIL = "<hr />\n"
+      + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+      + Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
+
+  /**
+   * HTML footer to be added in the jsps.
+   * 
+   * @return the HTML footer.
+   */
+  public static String htmlFooter() {
+    return HTML_TAIL;
+  }
+
+  /**
+   * Method used to generate the Job table for Job pages.
+   * 
+   * @param label display heading to be used in the job table.
+   * @param jobs vector of jobs to be displayed in table.
+   * @param refresh refresh interval to be used in jobdetails page.
+   * @param rowId beginning row id to be used in the table.
+   * @return generated HTML
+   * @throws IOException
+   */
+  public static String generateJobTable(String label, JobStatus[] jobs,
+      int refresh, int rowId) throws IOException {
+
+    StringBuffer sb = new StringBuffer();
+
+    if (jobs.length > 0) {
+      sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+      sb.append("<tr><th>Jobid</th>" + "<th>User</th>" + "<th>Name</th>"
+          + "<th>SuperStep</th>" + "<th>Starttime</th>" + "</tr>\n");
+      for (JobStatus status : jobs) {
+        sb.append("<tr><td><a href=\"bspjob.jsp?jobid=" + status.getJobID()
+            + "\">");
+        sb.append(status.getJobID());
+        sb.append("</a></td><td>");
+        sb.append(status.getUsername());
+        sb.append("</td><td>");
+        sb.append(status.getName());
+        sb.append("</td><td>");
+        sb.append(status.progress());
+        sb.append("</td><td>");
+        sb.append(new Date(status.getStartTime()));
+        sb.append("</td></tr>\n");
+      }
+      sb.append("</table>");
+    } else {
+      sb.append("No jobs found!");
+    }
+
+    return sb.toString();
+  }
+
+  public static String generateGroomsTable(String type, ClusterStatus status,
+      BSPMaster master) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    sb.append("<center>\n");
+    sb.append("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+    sb.append("<tr><td align=\"center\" colspan=\"6\"><b>Groom Servers</b></td></tr>\n");
+    sb.append("<tr><td><b>Name</b></td>"
+        + "<td><b>Host</b></td>"
+        + "<td><b># maximum tasks</b></td><td><b># current running tasks</b></td>" +
+        		"<td><b># current failures</b></td>" +
+        		"<td><b>Last seen</b></td>" +
+        		"</tr>\n");
+    for (Entry<String, GroomServerStatus> entry : status
+        .getActiveGroomServerStatus().entrySet()) {
+      sb.append("<tr><td>");
+      sb.append(entry.getKey() + "</td><td>");
+      sb.append(entry.getValue().getGroomHostName() + "</td>" + "<td>"
+          + entry.getValue().getMaxTasks() + "</td><td>");
+      sb.append(entry.getValue().countTasks() + "</td><td>");
+      sb.append(entry.getValue().getFailures() + "</td><td>");
+      sb.append(entry.getValue().getLastSeen() + "</td>");
+      sb.append("</tr>\n");
+    }
+    sb.append("</table>\n");
+    sb.append("</center>\n");
+    return sb.toString();
+  }
+
+}

Propchange: incubator/hama/branches/HamaV2/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message