incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1130510 - in /incubator/hama/trunk: ./ conf/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/http/ src/java/org/apache/hama/util/ src/test/org/apache/hama/ src/webapps/ src/webapps/bspmaster/
Date Thu, 02 Jun 2011 12:13:43 GMT
Author: edwardyoon
Date: Thu Jun  2 12:13:42 2011
New Revision: 1130510

URL: http://svn.apache.org/viewvc?rev=1130510&view=rev
Log:
Add Web UI for Job monitoring

Added:
    incubator/hama/trunk/src/java/org/apache/hama/http/
    incubator/hama/trunk/src/java/org/apache/hama/http/HttpServer.java
    incubator/hama/trunk/src/java/org/apache/hama/util/BSPServletUtil.java
    incubator/hama/trunk/src/webapps/
    incubator/hama/trunk/src/webapps/bspmaster/
    incubator/hama/trunk/src/webapps/bspmaster/bspjob.jsp
    incubator/hama/trunk/src/webapps/bspmaster/bspmaster.jsp
    incubator/hama/trunk/src/webapps/bspmaster/index.html
    incubator/hama/trunk/src/webapps/bspmaster/machines.jsp
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/pom.xml
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Jun  2 12:13:42 2011
@@ -4,6 +4,7 @@ Release 0.3 - Unreleased
 
   NEW FEATURES
 
+    HAMA-372: Add Web UI for Job monitoring (Thomas Jungblut via edwardyoon)
     HAMA-359: Add shortest path finding example (Thomas Jungblut via edwardyoon)
     HAMA-374: Add LocalBSPRunner (Thomas Jungblut via edwardyoon)
     HAMA-380: Add BSPMessageBundle to reduce RPC overhead (Miklos Erdelyi via edwardyoon)

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Thu Jun  2 12:13:42 2011
@@ -51,6 +51,12 @@
     <description>local directory for temporal store</description> 
   </property>
   <property>
+    <name>bsp.http.infoserver.port</name>
+    <value>50013</value>
+    <description>The port where the web-interface can be seen.
+    </description>
+  </property>
+  <property>
   <name>bsp.groom.report.address</name>
   <value>127.0.0.1:0</value>
   <description>The interface and port that groom server listens on. 
@@ -122,7 +128,7 @@
     </description>
   </property>
   <property>
-    <name>hama.zookeeper.property.clientPort</name>
+    <name>zookeeper.client.port</name>
     <value>21810</value>
     <description>Property from ZooKeeper's config zoo.cfg.
     The port at which the clients will connect.

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Thu Jun  2 12:13:42 2011
@@ -1,30 +1,29 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
 
-  <!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
 
-    http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
 
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  
   <parent>
     <groupId>org.apache</groupId>
     <artifactId>apache</artifactId>
     <version>8</version>
   </parent>
+  
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hama</groupId>
   <artifactId>hama-core</artifactId>
@@ -102,6 +101,66 @@
       <version>1.0.4</version>
     </dependency>
     <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-jsp-2.1</artifactId>
+      <version>7.0.2.v20100331</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jsp-api-2.1</artifactId>
+      <version>6.1H.14.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <version>6.1H.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <version>6.1H.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-plus</artifactId>
+      <version>6.1H.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-naming</artifactId>
+      <version>6.1H.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-annotations</artifactId>
+      <version>6.1H.22</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jdt.core.compiler</groupId>
+      <artifactId>ecj</artifactId>
+      <version>3.5.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat</groupId>
+      <artifactId>jasper</artifactId>
+      <version>6.0.32</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat</groupId>
+      <artifactId>jasper-el</artifactId>
+      <version>6.0.32</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tomcat</groupId>
+      <artifactId>juli</artifactId>
+      <version>6.0.32</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.5.2</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-core</artifactId>
       <version>0.20.2</version>
@@ -245,7 +304,8 @@
                 </goals>
                 <configuration>
                   <transformers>
-                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                    <transformer
+                      implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                       <mainClass>org.apache.hama.examples.ExampleDriver</mainClass>
                     </transformer>
                   </transformers>
@@ -264,32 +324,33 @@
       </dependencies>
     </profile>
     <profile>
-    <id>forrest</id>
-    <build> 
-   <plugins>
-     <plugin>
-       <artifactId>maven-antrun-plugin</artifactId>
-         <executions>
-           <execution>
-             <phase>package</phase>
-               <configuration>
-                 <tasks>
-                   <exec dir="${basedir}/src/docs" executable="${forrest.home}/bin/forrest" failonerror="true">
-                   </exec>
-                   <copy todir="${project.build.directory}/site">
-                   <fileset dir="${basedir}/src/docs/build/site/" />
-                   </copy>
-                   <delete dir="${basedir}/src/docs/build/" />
-                 </tasks>
-               </configuration>
-               <goals>
-                 <goal>run</goal>
-               </goals>
-             </execution>
-           </executions>
-         </plugin>
-       </plugins>
-     </build> 
+      <id>forrest</id>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <configuration>
+                  <tasks>
+                    <exec dir="${basedir}/src/docs" executable="${forrest.home}/bin/forrest"
+                      failonerror="true">
+                    </exec>
+                    <copy todir="${project.build.directory}/site">
+                      <fileset dir="${basedir}/src/docs/build/site/" />
+                    </copy>
+                    <delete dir="${basedir}/src/docs/build/" />
+                  </tasks>
+                </configuration>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 </project>

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobClient.java Thu Jun  2 12:13:42 2011
@@ -299,7 +299,7 @@ public class BSPJobClient extends Config
     // check the number of BSP tasks
     int tasks = job.getNumBspTask();
     int maxTasks = clusterStatus.getMaxTasks();
-    
+
     if (tasks <= 0 || tasks > maxTasks) {
       LOG.warn("The number of tasks you've entered was invalid. Using default value of "
           + maxTasks + "!");
@@ -566,7 +566,7 @@ public class BSPJobClient extends Config
       RunningJob job = jc.submitJob(new BSPJob(tConf));
       System.out.println("Created job " + job.getID().toString());
     } else if (killJob) {
-      RunningJob job = jc.getJob(new BSPJobID().forName(jobid));
+      RunningJob job = jc.getJob(BSPJobID.forName(jobid));
       if (job == null) {
         System.out.println("Could not find job " + jobid);
       } else {
@@ -575,7 +575,7 @@ public class BSPJobClient extends Config
       }
       exitCode = 0;
     } else if (getStatus) {
-      RunningJob job = jc.getJob(new BSPJobID().forName(jobid));
+      RunningJob job = jc.getJob(BSPJobID.forName(jobid));
       if (job == null) {
         System.out.println("Could not find job " + jobid);
       } else {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPJobID.java Thu Jun  2 12:13:42 2011
@@ -100,7 +100,7 @@ public class BSPJobID extends ID impleme
     jtIdentifier.write(out);
   }
 
-  public BSPJobID forName(String str) throws IllegalArgumentException {
+  public static BSPJobID forName(String str) throws IllegalArgumentException {
     if (str == null)
       return null;
     try {

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMaster.java Thu Jun  2 12:13:42 2011
@@ -29,10 +29,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,6 +46,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.http.HttpServer;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 import org.apache.hama.ipc.MasterProtocol;
 import org.apache.hama.ipc.WorkerProtocol;
@@ -54,7 +55,7 @@ import org.apache.hama.ipc.WorkerProtoco
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
-public class BSPMaster implements JobSubmissionProtocol, MasterProtocol, 
+public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
     GroomServerManager {
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
 
@@ -77,6 +78,17 @@ public class BSPMaster implements JobSub
   // private Server interServer;
   private Server masterServer;
 
+  // host and port
+  private String host;
+  private int port;
+
+  // startTime
+  private long startTime;
+
+  // HTTP server
+  private HttpServer infoServer;
+  private int infoPort;
+
   // Filesystem
   static final String SUBDIR = "bspMaster";
   FileSystem fs = null;
@@ -90,7 +102,6 @@ public class BSPMaster implements JobSub
 
   // Jobs' Meta Data
   private Integer nextJobId = Integer.valueOf(1);
-  // private long startTime;
   private int totalSubmissions = 0; // how many jobs has been submitted by
   // clients
   private int totalTasks = 0; // currnetly running tasks
@@ -100,20 +111,18 @@ public class BSPMaster implements JobSub
   private TaskScheduler taskScheduler;
 
   // GroomServers cache
-  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers =
-    new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
+  protected ConcurrentMap<GroomServerStatus, WorkerProtocol> groomServers = new ConcurrentHashMap<GroomServerStatus, WorkerProtocol>();
 
   private Instructor instructor;
 
-  private final List<JobInProgressListener> jobInProgressListeners = 
-    new CopyOnWriteArrayList<JobInProgressListener>();
+  private final List<JobInProgressListener> jobInProgressListeners = new CopyOnWriteArrayList<JobInProgressListener>();
 
-  private class ReportGroomStatusHandler implements DirectiveHandler{
+  private class ReportGroomStatusHandler implements DirectiveHandler {
 
-    public void handle(Directive directive) throws DirectiveException{
+    public void handle(Directive directive) throws DirectiveException {
       // update GroomServerStatus held in the groomServers cache.
-      GroomServerStatus groomStatus = 
-        ((ReportGroomStatusDirective)directive).getStatus();
+      GroomServerStatus groomStatus = ((ReportGroomStatusDirective) directive)
+          .getStatus();
       // groomServers cache contains groom server status reported back
       if (groomServers.containsKey(groomStatus)) {
         GroomServerStatus tmpStatus = null;
@@ -131,7 +140,7 @@ public class BSPMaster implements JobSub
             TaskInProgress tip = jip.findTaskInProgress(((TaskAttemptID) ts
                 .getTaskId()).getTaskID());
 
-            if(ts.getRunState() == TaskStatus.State.SUCCEEDED) {
+            if (ts.getRunState() == TaskStatus.State.SUCCEEDED) {
               jip.completedTask(tip, ts);
             } else if (ts.getRunState() == TaskStatus.State.RUNNING) {
               // do nothing
@@ -140,7 +149,7 @@ public class BSPMaster implements JobSub
               jip.failedTask(tip, ts);
             }
             if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
-              for(JobInProgressListener listener: jobInProgressListeners){
+              for (JobInProgressListener listener : jobInProgressListeners) {
                 try {
                   listener.jobRemoved(jip);
                 } catch (IOException ioe) {
@@ -151,13 +160,14 @@ public class BSPMaster implements JobSub
               jip.getStatus().setprogress(ts.getSuperstepCount());
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
               WorkerProtocol worker = findGroomServer(tmpStatus);
-              Directive d1 = new DispatchTasksDirective(currentGroomServerPeers(), 
-                  new GroomServerAction[] {new KillTaskAction(ts.getTaskId()) });
-              try{
+              Directive d1 = new DispatchTasksDirective(
+                  currentGroomServerPeers(),
+                  new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
+              try {
                 worker.dispatch(d1);
-              }catch(IOException ioe){
-                throw new DirectiveException("Error when dispatching kill task"+
-                " action.", ioe);
+              } catch (IOException ioe) {
+                throw new DirectiveException("Error when dispatching kill task"
+                    + " action.", ioe);
               }
             }
           }
@@ -166,43 +176,44 @@ public class BSPMaster implements JobSub
               + "but fail to retrieve it.");
         }
       } else {
-        throw new RuntimeException("GroomServer not found." + 
-        groomStatus.getGroomName());
+        throw new RuntimeException("GroomServer not found."
+            + groomStatus.getGroomName());
       }
     }
   }
 
-  private class Instructor extends Thread{
+  private class Instructor extends Thread {
     private final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
-    private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = 
-      new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+    private final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
 
-    public void bind(Class<? extends Directive> instruction, 
-        DirectiveHandler handler){
+    public void bind(Class<? extends Directive> instruction,
+        DirectiveHandler handler) {
       handlers.putIfAbsent(instruction, handler);
     }
 
-    public void put(Directive directive){
-      try{
+    public void put(Directive directive) {
+      try {
         buffer.put(directive);
-      }catch(InterruptedException ie){
-        LOG.error("Fail to put directive into queue.", ie); 
+      } catch (InterruptedException ie) {
+        LOG.error("Fail to put directive into queue.", ie);
       }
     }
 
-    public void run(){
-      while(true){
-        try{
-          Directive directive = this.buffer.take(); 
-          if(directive instanceof ReportGroomStatusDirective){
-            ((DirectiveHandler)handlers.get(ReportGroomStatusDirective.class)).handle(directive);
-          }else{
-            throw new RuntimeException("Directive is not supported."+directive);
+    public void run() {
+      while (true) {
+        try {
+          Directive directive = this.buffer.take();
+          if (directive instanceof ReportGroomStatusDirective) {
+            ((DirectiveHandler) handlers.get(ReportGroomStatusDirective.class))
+                .handle(directive);
+          } else {
+            throw new RuntimeException("Directive is not supported."
+                + directive);
           }
-        }catch(InterruptedException ie){
+        } catch (InterruptedException ie) {
           LOG.error("Unable to retrieve directive from the queue.", ie);
           Thread.currentThread().interrupt();
-        }catch(Exception e){
+        } catch (Exception e) {
           LOG.error("Fail to execute directive command.", e);
         }
       }
@@ -229,11 +240,21 @@ public class BSPMaster implements JobSub
     this.taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(
         schedulerClass, conf);
 
-    String host = getAddress(conf).getHostName();
-    int port = getAddress(conf).getPort();
+    host = getAddress(conf).getHostName();
+    port = getAddress(conf).getPort();
     LOG.info("RPC BSPMaster: host " + host + " port " + port);
+
+    startTime = System.currentTimeMillis();
     this.masterServer = RPC.getServer(this, host, port, conf);
 
+    infoPort = conf.getInt("bsp.http.infoserver.port", 50013);
+
+    infoServer = new HttpServer("bspmaster", host, infoPort, true, conf);
+    infoServer.setAttribute("bsp.master", this);
+
+    // starting webserver
+    infoServer.start();
+
     while (!Thread.currentThread().isInterrupted()) {
       try {
         if (fs == null) {
@@ -334,7 +355,7 @@ public class BSPMaster implements JobSub
 
   @Override
   public boolean report(Directive directive) throws IOException {
-    instructor.put(directive); 
+    instructor.put(directive);
     return true;
   }
 
@@ -425,12 +446,12 @@ public class BSPMaster implements JobSub
     synchronized (this) {
       state = State.RUNNING;
     }
-    
+
     instructor = new Instructor();
-    instructor.bind(ReportGroomStatusDirective.class, 
-      new ReportGroomStatusHandler());
+    instructor.bind(ReportGroomStatusDirective.class,
+        new ReportGroomStatusHandler());
     instructor.start();
-    
+
     LOG.info("Starting RUNNING");
 
     this.masterServer.join();
@@ -500,10 +521,10 @@ public class BSPMaster implements JobSub
         groomPeersMap.put(s.getGroomName(), s.getPeerName());
       }
     }
-    
+
     // TODO currently we only have one task slot per groom server
     this.totalTaskCapacity = numGroomServers;
-    
+
     if (detailed) {
       return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
           state);
@@ -547,6 +568,22 @@ public class BSPMaster implements JobSub
     return tmp;
   }
 
+  public String getBSPMasterName() {
+    return host + ":" + port;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getBSPMasterIdentifier() {
+    return masterIdentifier;
+  }
+
+  public int getHttpPort() {
+    return infoPort;
+  }
+
   /**
    * Adds a job to the bsp master. Make sure that the checks are inplace before
    * adding a job. This is the core job submission logic
@@ -591,6 +628,7 @@ public class BSPMaster implements JobSub
       status.setStartTime(jip.getStartTime());
       // Sets the user name
       status.setUsername(jip.getProfile().getUser());
+      status.setName(jip.getJobName());
 
       if (toComplete) {
         if (status.getRunState() == JobStatus.RUNNING

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Thu Jun  2 12:13:42 2011
@@ -121,7 +121,7 @@ public class GroomServer implements Runn
   InetSocketAddress taskReportAddress;
   Server taskReportServer = null;
 
-  private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
+//  private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();
 
   private class DispatchTasksHandler implements DirectiveHandler {
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobInProgress.java Thu Jun  2 12:13:42 2011
@@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path;
  * tables for doing bookkeeping of its Tasks.ss
  */
 class JobInProgress {
-  
+
   /**
    * Used when the a kill is issued to a job which is initializing.
    */
@@ -62,6 +62,8 @@ class JobInProgress {
   long launchTime;
   long finishTime;
 
+  private String jobName;
+
   // private LocalFileSystem localFs;
   private BSPJobID jobId;
   final BSPMaster master;
@@ -71,14 +73,15 @@ class JobInProgress {
   int numBSPTasks = 0;
   int clusterSize;
 
-  public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master, Configuration conf)
-      throws IOException {
+  public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
+      Configuration conf) throws IOException {
     this.conf = conf;
     this.jobId = jobId;
     this.localFs = FileSystem.getLocal(conf);
     this.jobFile = jobFile;
     this.master = master;
-    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP.value());
+    this.status = new JobStatus(jobId, null, 0L, 0L,
+        JobStatus.State.PREP.value());
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -94,8 +97,10 @@ class JobInProgress {
     BSPJob job = new BSPJob(jobId, localJobFile.toString());
     this.numBSPTasks = job.getNumBspTask();
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
-        .getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
+        job.getJobName());
+
+    this.setJobName(job.getJobName());
 
     status.setUsername(job.getUser());
     status.setStartTime(startTime);
@@ -141,10 +146,10 @@ class JobInProgress {
     return jobId;
   }
 
-  public synchronized TaskInProgress findTaskInProgress(TaskID id){
-    if(areTasksInited()){
-      for(TaskInProgress tip: tasks){
-        if(tip.getTaskId().equals(id)){
+  public synchronized TaskInProgress findTaskInProgress(TaskID id) {
+    if (areTasksInited()) {
+      for (TaskInProgress tip : tasks) {
+        if (tip.getTaskId().equals(id)) {
           return tip;
         }
       }
@@ -152,7 +157,7 @@ class JobInProgress {
     return null;
   }
 
-  public synchronized boolean areTasksInited(){
+  public synchronized boolean areTasksInited() {
     return this.tasksInited;
   }
 
@@ -171,10 +176,10 @@ class JobInProgress {
       return;
     }
 
-    if(LOG.isDebugEnabled()){
+    if (LOG.isDebugEnabled()) {
       LOG.debug("numBSPTasks: " + numBSPTasks);
     }
-    
+
     // adjust number of map tasks to actual number of splits
     this.tasks = new TaskInProgress[numBSPTasks];
     for (int i = 0; i < numBSPTasks; i++) {
@@ -233,13 +238,14 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.SUCCEEDED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
-      
+
       LOG.debug("Job successfully done.");
-      
+
       garbageCollect();
     }
   }
@@ -263,17 +269,18 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(), this.profile
-          .getUser(), superstepCounter, superstepCounter, superstepCounter, JobStatus.FAILED, superstepCounter);
+      this.status = new JobStatus(this.status.getJobID(),
+          this.profile.getUser(), superstepCounter, superstepCounter,
+          superstepCounter, JobStatus.FAILED, superstepCounter);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
-      
+
       LOG.debug("Job failed.");
-      
+
       garbageCollect();
     }
   }
-  
+
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
     tip.updateStatus(taskStatus); // update tip
@@ -336,4 +343,18 @@ class JobInProgress {
     return restartCount;
   }
 
+  /**
+   * @param jobName the jobName to set
+   */
+  public void setJobName(String jobName) {
+    this.jobName = jobName;
+  }
+
+  /**
+   * @return the jobName
+   */
+  public String getJobName() {
+    return jobName;
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/JobStatus.java Thu Jun  2 12:13:42 2011
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.WritableFact
  */
 public class JobStatus implements Writable, Cloneable {
   public static final Log LOG = LogFactory.getLog(JobStatus.class);
-  
+
   static {
     WritableFactories.setFactory(JobStatus.class, new WritableFactory() {
       public Writable newInstance() {
@@ -42,19 +42,42 @@ public class JobStatus implements Writab
     });
   }
 
-  public static enum State{
-    RUNNING(1),
-    SUCCEEDED(2),
-    FAILED(3),
-    PREP(4),
-    KILLED(5);
+  public static enum State {
+    RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);
     int s;
-    State(int s){
+
+    State(int s) {
       this.s = s;
     }
-    public int value(){
+
+    public int value() {
       return this.s;
     }
+
+    @Override
+    public String toString() {
+      String name = null;
+      switch (this) {
+        case RUNNING:
+          name = "RUNNING";
+          break;
+        case SUCCEEDED:
+          name = "SUCCEEDED";
+          break;
+        case FAILED:
+          name = "FAILED";
+          break;
+        case PREP:
+          name = "SETUP";
+          break;
+        case KILLED:
+          name = "KILLED";
+          break;
+      }
+
+      return name;
+    }
+
   }
 
   public static final int RUNNING = 1;
@@ -73,9 +96,10 @@ public class JobStatus implements Writab
   private String schedulingInfo = "NA";
   private String user;
   private long superstepCount;
+  private String name;
 
   private long finishTime;
-  
+
   public JobStatus() {
   }
 
@@ -83,28 +107,28 @@ public class JobStatus implements Writab
     this(jobid, user, progress, 0, runState);
   }
 
-  public JobStatus(BSPJobID jobid, String user, long progress, long cleanupProgress,
-      int runState) {
+  public JobStatus(BSPJobID jobid, String user, long progress,
+      long cleanupProgress, int runState) {
     this(jobid, user, 0, progress, cleanupProgress, runState);
   }
 
-  public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress,
-      long cleanupProgress, int runState) {
+  public JobStatus(BSPJobID jobid, String user, long setupProgress,
+      long progress, long cleanupProgress, int runState) {
     this(jobid, user, 0, progress, cleanupProgress, runState, 0);
   }
 
-  public JobStatus(BSPJobID jobid, String user, long setupProgress, long progress,
-      long cleanupProgress, int runState, long superstepCount) {
+  public JobStatus(BSPJobID jobid, String user, long setupProgress,
+      long progress, long cleanupProgress, int runState, long superstepCount) {
     this.jobid = jobid;
     this.setupProgress = setupProgress;
     this.progress = progress;
     this.cleanupProgress = cleanupProgress;
     this.runState = runState;
-    this.state = State.values()[runState-1];
+    this.state = State.values()[runState - 1];
     this.superstepCount = superstepCount;
     this.user = user;
   }
-  
+
   public BSPJobID getJobID() {
     return jobid;
   }
@@ -133,11 +157,11 @@ public class JobStatus implements Writab
     this.setupProgress = p;
   }
 
-  public JobStatus.State getState(){
+  public JobStatus.State getState() {
     return this.state;
   }
 
-  public void setState(JobStatus.State state){
+  public void setState(JobStatus.State state) {
     this.state = state;
   }
 
@@ -152,11 +176,11 @@ public class JobStatus implements Writab
   public synchronized long getSuperstepCount() {
     return superstepCount;
   }
-  
+
   public synchronized void setSuperstepCount(long superstepCount) {
     this.superstepCount = superstepCount;
   }
-  
+
   public synchronized void setStartTime(long startTime) {
     this.startTime = startTime;
   }
@@ -168,14 +192,14 @@ public class JobStatus implements Writab
   public synchronized void setFinishTime(long finishTime) {
     this.finishTime = finishTime;
   }
-  
+
   /**
    * Get the finish time of the job.
    */
-  public synchronized long getFinishTime() { 
+  public synchronized long getFinishTime() {
     return finishTime;
   }
-  
+
   /**
    * @param user The username of the job
    */
@@ -238,4 +262,18 @@ public class JobStatus implements Writab
     this.superstepCount = in.readLong();
   }
 
+  /**
+   * @param name the name to set
+   */
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * @return the name
+   */
+  public synchronized String getName() {
+    return name;
+  }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Thu Jun  2 12:13:42 2011
@@ -94,9 +94,9 @@ class TaskInProgress {
       Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
-    this.bspMaster = master;
+    this.setBspMaster(master);
     this.job = job;
-    this.conf = conf;
+    this.setConf(conf);
     this.partition = partition;
 
     this.id = new TaskID(jobId, partition);
@@ -257,4 +257,32 @@ class TaskInProgress {
     return failed;
   }
 
+  /**
+   * @param conf the conf to set
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * @return the conf
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * @param bspMaster the bspMaster to set
+   */
+  public void setBspMaster(BSPMaster bspMaster) {
+    this.bspMaster = bspMaster;
+  }
+
+  /**
+   * @return the bspMaster
+   */
+  public BSPMaster getBspMaster() {
+    return bspMaster;
+  }
+
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/http/HttpServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/http/HttpServer.java?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/http/HttpServer.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/http/HttpServer.java Thu Jun  2 12:13:42 2011
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.log.LogLevel;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.servlet.FilterHolder;
+import org.mortbay.jetty.servlet.FilterMapping;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.QueuedThreadPool;
+import org.mortbay.util.MultiException;
+
+/**
+ * Create a Jetty embedded server to answer http requests. The primary goal is
+ * to serve up status information for the server. There are three contexts:
+ * "/logs/" -> points to the log directory "/static/" -> points to common static
+ * files (src/webapps/static) "/" -> the jsp server code from
+ * (src/webapps/<name>)
+ */
+public class HttpServer {
+  public static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+  static final String FILTER_INITIALIZER_PROPERTY = "hadoop.http.filter.initializers";
+
+  protected final Server webServer;
+  protected final Connector listener;
+  protected final WebAppContext webAppContext;
+  protected final boolean findPort;
+  protected final Map<Context, Boolean> defaultContexts = new HashMap<Context, Boolean>();
+  protected final List<String> filterNames = new ArrayList<String>();
+  private static final int MAX_RETRIES = 10;
+
+  /** Same as this(name, bindAddress, port, findPort, null); */
+  public HttpServer(String name, String bindAddress, int port, boolean findPort)
+      throws IOException {
+    this(name, bindAddress, port, findPort, new Configuration());
+  }
+
+  /**
+   * Create a status server on the given port. The jsp scripts are taken from
+   * src/webapps/<name>.
+   * 
+   * @param name The name of the server
+   * @param port The port to use on the server
+   * @param findPort whether the server should start at the given port and
+   *          increment by 1 until it finds a free port.
+   * @param conf Configuration
+   */
+  public HttpServer(String name, String bindAddress, int port,
+      boolean findPort, Configuration conf) throws IOException {
+    webServer = new Server();
+    this.findPort = findPort;
+
+    listener = createBaseListener(conf);
+    listener.setHost(bindAddress);
+    listener.setPort(port);
+    webServer.addConnector(listener);
+
+    webServer.setThreadPool(new QueuedThreadPool());
+
+    final String appDir = getWebAppsPath();
+    ContextHandlerCollection contexts = new ContextHandlerCollection();
+    webServer.setHandler(contexts);
+    webAppContext = new WebAppContext();
+
+    System.setProperty("java.naming.factory.initial",
+        "org.mortbay.naming.InitialContextFactory");
+    System.setProperty("java.naming.factory.url.pkgs", "org.mortbay.naming");
+
+    webAppContext.setContextPath("/");
+    webAppContext.setWar(appDir + "/" + name);
+    webServer.addHandler(webAppContext);
+
+    addDefaultApps(contexts, appDir);
+
+    addDefaultServlets();
+  }
+
+  /**
+   * Create a required listener for the Jetty instance listening on the port
+   * provided. This wrapper and all subclasses must create at least one
+   * listener.
+   */
+  protected Connector createBaseListener(Configuration conf) throws IOException {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setLowResourceMaxIdleTime(10000);
+    ret.setAcceptQueueSize(128);
+    ret.setResolveNames(false);
+    ret.setUseDirectBuffers(false);
+    return ret;
+  }
+
+  /**
+   * Add default apps.
+   * 
+   * @param appDir The application directory
+   * @throws IOException
+   */
+  protected void addDefaultApps(ContextHandlerCollection parent,
+      final String appDir) throws IOException {
+    // set up the context for "/logs/" if "hadoop.log.dir" property is defined.
+    String logDir = System.getProperty("hadoop.log.dir");
+    if (logDir != null) {
+      Context logContext = new Context(parent, "/logs");
+      logContext.setResourceBase(logDir);
+      logContext.addServlet(DefaultServlet.class, "/");
+      defaultContexts.put(logContext, true);
+    }
+    // set up the context for "/static/*"
+    Context staticContext = new Context(parent, "/static");
+    staticContext.setResourceBase(appDir + "/static");
+    staticContext.addServlet(DefaultServlet.class, "/*");
+    defaultContexts.put(staticContext, true);
+  }
+
+  /**
+   * Add default servlets.
+   */
+  protected void addDefaultServlets() {
+    // set up default servlets
+    addServlet("stacks", "/stacks", StackServlet.class);
+    addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
+  }
+
+  public void addContext(Context ctxt, boolean isFiltered) throws IOException {
+    webServer.addHandler(ctxt);
+    defaultContexts.put(ctxt, isFiltered);
+  }
+
+  /**
+   * Add a context
+   * 
+   * @param pathSpec The path spec for the context
+   * @param dir The directory containing the context
+   * @param isFiltered if true, the servlet is added to the filter path mapping
+   * @throws IOException
+   */
+  protected void addContext(String pathSpec, String dir, boolean isFiltered)
+      throws IOException {
+    if (0 == webServer.getHandlers().length) {
+      throw new RuntimeException("Couldn't find handler");
+    }
+    WebAppContext webAppCtx = new WebAppContext();
+    webAppCtx.setContextPath(pathSpec);
+    webAppCtx.setWar(dir);
+    addContext(webAppCtx, true);
+  }
+
+  /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * 
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name, value);
+  }
+
+  /**
+   * Add a servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    addInternalServlet(name, pathSpec, clazz);
+    addFilterPathMapping(pathSpec, webAppContext);
+  }
+
+  /**
+   * Add an internal servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   * @deprecated this is a temporary method
+   */
+  @Deprecated
+  public void addInternalServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    ServletHolder holder = new ServletHolder(clazz);
+    if (name != null) {
+      holder.setName(name);
+    }
+    webAppContext.addServlet(holder, pathSpec);
+  }
+
+  /**
+   * Define a filter for a context and set up default url mappings.
+   */
+  protected void defineFilter(Context ctx, String name, String classname,
+      Map<String, String> parameters, String[] urls) {
+
+    FilterHolder holder = new FilterHolder();
+    holder.setName(name);
+    holder.setClassName(classname);
+    holder.setInitParameters(parameters);
+    FilterMapping fmap = new FilterMapping();
+    fmap.setPathSpecs(urls);
+    fmap.setDispatches(Handler.ALL);
+    fmap.setFilterName(name);
+    ServletHandler handler = ctx.getServletHandler();
+    handler.addFilter(holder, fmap);
+  }
+
+  /**
+   * Add the path spec to the filter path mapping.
+   * 
+   * @param pathSpec The path spec
+   * @param webAppCtx The WebApplicationContext to add to
+   */
+  protected void addFilterPathMapping(String pathSpec, Context webAppCtx) {
+    ServletHandler handler = webAppCtx.getServletHandler();
+    for (String name : filterNames) {
+      FilterMapping fmap = new FilterMapping();
+      fmap.setPathSpec(pathSpec);
+      fmap.setFilterName(name);
+      fmap.setDispatches(Handler.ALL);
+      handler.addFilterMapping(fmap);
+    }
+  }
+
+  /**
+   * Get the value in the webapp context.
+   * 
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }
+
+  /**
+   * Get the pathname to the webapps files.
+   * 
+   * @return the pathname as a URL
+   * @throws IOException if 'webapps' directory cannot be found on CLASSPATH.
+   */
+  protected String getWebAppsPath() throws IOException {
+    // URL url = BSPMaster.class.getClassLoader().getResource("webapps");
+    // if (url == null)
+    // throw new IOException("webapps not found in CLASSPATH");
+    // return url.toString();
+    return "src/webapps";
+  }
+
+  /**
+   * Get the port that the server is on
+   * 
+   * @return the port
+   */
+  public int getPort() {
+    return webServer.getConnectors()[0].getLocalPort();
+  }
+
+  /**
+   * Set the min, max number of worker threads (simultaneous connections).
+   */
+  public void setThreads(int min, int max) {
+    QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool();
+    pool.setMinThreads(min);
+    pool.setMaxThreads(max);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param keystore location of the keystore
+   * @param storPass password for the keystore
+   * @param keyPass password for the key
+   * @deprecated Use
+   *             {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
+   */
+  @Deprecated
+  public void addSslListener(InetSocketAddress addr, String keystore,
+      String storPass, String keyPass) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(keystore);
+    sslListener.setPassword(storPass);
+    sslListener.setKeyPassword(keyPass);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param sslConf conf to retrieve ssl options
+   * @param needClientAuth whether client authentication is required
+   */
+  public void addSslListener(InetSocketAddress addr, Configuration sslConf,
+      boolean needClientAuth) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    if (needClientAuth) {
+      // setting up SSL truststore for authenticating clients
+      System.setProperty("javax.net.ssl.trustStore",
+          sslConf.get("ssl.server.truststore.location", ""));
+      System.setProperty("javax.net.ssl.trustStorePassword",
+          sslConf.get("ssl.server.truststore.password", ""));
+      System.setProperty("javax.net.ssl.trustStoreType",
+          sslConf.get("ssl.server.truststore.type", "jks"));
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
+    sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
+    sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword",
+        ""));
+    sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
+    sslListener.setNeedClientAuth(needClientAuth);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Start the server. Does not wait for the server to start.
+   */
+  public void start() throws IOException {
+    try {
+      int port = 0;
+      int oriPort = listener.getPort(); // The original requested port
+      while (true) {
+        try {
+          port = webServer.getConnectors()[0].getLocalPort();
+          LOG.info("Port returned by webServer.getConnectors()[0]."
+              + "getLocalPort() before open() is " + port
+              + ". Opening the listener on " + oriPort);
+          listener.open();
+          port = listener.getLocalPort();
+          LOG.info("listener.getLocalPort() returned "
+              + listener.getLocalPort()
+              + " webServer.getConnectors()[0].getLocalPort() returned "
+              + webServer.getConnectors()[0].getLocalPort());
+          // Workaround to handle the problem reported in HADOOP-4744
+          if (port < 0) {
+            Thread.sleep(100);
+            int numRetries = 1;
+            while (port < 0) {
+              LOG.warn("listener.getLocalPort returned " + port);
+              if (numRetries++ > MAX_RETRIES) {
+                throw new Exception(" listener.getLocalPort is returning "
+                    + "less than 0 even after " + numRetries + " resets");
+              }
+              for (int i = 0; i < 2; i++) {
+                LOG.info("Retrying listener.getLocalPort()");
+                port = listener.getLocalPort();
+                if (port > 0) {
+                  break;
+                }
+                Thread.sleep(200);
+              }
+              if (port > 0) {
+                break;
+              }
+              LOG.info("Bouncing the listener");
+              listener.close();
+              Thread.sleep(1000);
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              port = listener.getLocalPort();
+            }
+          } // Workaround end
+          LOG.info("Jetty bound to port " + port);
+          webServer.start();
+          // Workaround for HADOOP-6386
+          port = listener.getLocalPort();
+          if (port < 0) {
+            LOG.warn("Bounds port is " + port + " after webserver start");
+            for (int i = 0; i < MAX_RETRIES / 2; i++) {
+              try {
+                webServer.stop();
+              } catch (Exception e) {
+                LOG.warn("Can't stop  web-server", e);
+              }
+              Thread.sleep(1000);
+
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              webServer.start();
+              LOG.info(i + "attempts to restart webserver");
+              port = listener.getLocalPort();
+              if (port > 0)
+                break;
+            }
+            if (port < 0)
+              throw new Exception("listener.getLocalPort() is returning "
+                  + "less than 0 even after " + MAX_RETRIES + " resets");
+          }
+          // End of HADOOP-6386 workaround
+          break;
+        } catch (IOException ex) {
+          // if this is a bind exception,
+          // then try the next port number.
+          if (ex instanceof BindException) {
+            if (!findPort) {
+              throw (BindException) ex;
+            }
+          } else {
+            LOG.info("HttpServer.start() threw a non Bind IOException");
+            throw ex;
+          }
+        } catch (MultiException ex) {
+          LOG.info("HttpServer.start() threw a MultiException");
+          throw ex;
+        }
+        listener.setPort((oriPort += 1));
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Problem starting http server", e);
+    }
+  }
+
+  /**
+   * stop the server
+   */
+  public void stop() throws Exception {
+    listener.close();
+    webServer.stop();
+  }
+
+  public void join() throws InterruptedException {
+    webServer.join();
+  }
+
+  /**
+   * A very simple servlet to serve up a text representation of the current
+   * stack traces. It both returns the stacks to the caller and logs them.
+   * Currently the stack traces are done sequentially rather than exactly the
+   * same data.
+   */
+  public static class StackServlet extends HttpServlet {
+    private static final long serialVersionUID = -6284183679759467039L;
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException, IOException {
+
+      PrintWriter out = new PrintWriter(response.getOutputStream());
+      ReflectionUtils.printThreadInfo(out, "");
+      out.close();
+      ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
+    }
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/util/BSPServletUtil.java?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/util/BSPServletUtil.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/util/BSPServletUtil.java Thu Jun  2 12:13:42 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Calendar;
+
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hama.bsp.JobStatus;
+
+public class BSPServletUtil extends ServletUtil {
+
+  public static final String HTML_TAIL = "<hr />\n"
+      + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+      + Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
+
+  /**
+   * HTML footer to be added in the jsps.
+   * 
+   * @return the HTML footer.
+   */
+  public static String htmlFooter() {
+    return HTML_TAIL;
+  }
+
+  /**
+   * Method used to generate the Job table for Job pages.
+   * 
+   * @param label display heading to be used in the job table.
+   * @param jobs vector of jobs to be displayed in table.
+   * @param refresh refresh interval to be used in jobdetails page.
+   * @param rowId beginning row id to be used in the table.
+   * @return
+   * @throws IOException
+   */
+  public static String generateJobTable(String label, JobStatus[] jobs,
+      int refresh, int rowId) throws IOException {
+
+    StringBuffer sb = new StringBuffer();
+
+    if (jobs.length > 0) {
+      sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+      sb.append("<tr><th>Jobid</th>" + "<th>User</th>" + "<th>Name</th>"
+          + "<th>SuperStep</th>" + "<th>Starttime</th>" + "</tr>\n");
+      for (JobStatus status : jobs) {
+        sb.append("<tr><td><a href=\"bspjob.jsp?jobid="+status.getJobID()+ "\">");
+        sb.append(status.getJobID());
+        sb.append("</a></td><td>");
+        sb.append(status.getUsername());
+        sb.append("</td><td>");
+        sb.append(status.getName());
+        sb.append("</td><td>");
+        sb.append(status.getSuperstepCount());
+        sb.append("</td><td>");
+        sb.append(new Date(status.getStartTime()));
+        sb.append("</td></tr>\n");
+      }
+      sb.append("</table>");
+    } else {
+      sb.append("No jobs found!");
+    }
+
+    return sb.toString();
+  }
+
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?rev=1130510&r1=1130509&r2=1130510&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Thu Jun  2 12:13:42 2011
@@ -171,7 +171,7 @@ public class MiniBSPCluster {
       try{
         Thread.sleep(1000);
         cnt++;
-        if(10 < cnt){
+        if(100 < cnt){
           fail("Fail to launch BSPMaster.");
         }
       }catch(InterruptedException ie){

Added: incubator/hama/trunk/src/webapps/bspmaster/bspjob.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/webapps/bspmaster/bspjob.jsp?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/webapps/bspmaster/bspjob.jsp (added)
+++ incubator/hama/trunk/src/webapps/bspmaster/bspjob.jsp Thu Jun  2 12:13:42 2011
@@ -0,0 +1,66 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+  BSPMaster tracker = (BSPMaster) application
+      .getAttribute("bsp.master");
+  String idString = request.getParameter("jobid");
+  JobStatus status = tracker.getJobStatus(BSPJobID.forName(idString));
+  JobStatus.State state = status.getState();
+%>
+
+<html>
+
+<title>Hama BSP Job Summary</title>
+
+<body>
+  <h1><%=status.getName()%></h1>
+
+  <b>State: </b>  <%=state.toString() %> 
+  
+  <br/> <br/>
+  <table border="1" cellpadding="5" cellspacing="0">
+    <tr>
+      <th>Name</th>
+      <th>User</th>
+      <th>SuperStep</th>
+      <th>StartTime</th>
+      <th>FinishTime</th>
+    </tr>
+
+    <tr>
+      <td><%=status.getName() %></td>
+      <td><%=status.getUsername() %></td>
+      <td><%=status.getSuperstepCount() %></td>
+      <td><%=new Date(status.getStartTime()).toString() %></td>
+      <td>
+        <% if(status.getFinishTime() != 0L) {out.write(new Date(status.getFinishTime()).toString());} %>
+      </td>
+    </tr>
+
+  </table>
+  
+  <hr>
+  <a href="bspmaster.jsp">Back to BSPMaster</a>
+
+  <%
+    out.println(BSPServletUtil.htmlFooter());
+  %>
\ No newline at end of file

Added: incubator/hama/trunk/src/webapps/bspmaster/bspmaster.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/webapps/bspmaster/bspmaster.jsp?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/webapps/bspmaster/bspmaster.jsp (added)
+++ incubator/hama/trunk/src/webapps/bspmaster/bspmaster.jsp Thu Jun  2 12:13:42 2011
@@ -0,0 +1,80 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+  BSPMaster tracker = (BSPMaster) application
+      .getAttribute("bsp.master");
+  ClusterStatus status = tracker.getClusterStatus(true);
+  String trackerName = tracker.getBSPMasterName();
+  JobStatus[] runningJobs = tracker.jobsToComplete();
+%>
+<%!private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
+ 
+  public void generateSummaryTable(JspWriter out, ClusterStatus status,
+      BSPMaster tracker) throws IOException {
+    String tasksPerNode = status.getGroomServers() > 0 ? percentFormat
+        .format(((double) (status.getMaxTasks()) / status
+            .getGroomServers())) : "-";
+    out.print("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n"
+        + "<tr>" + "<th>Grooms</th><th>BSP Task Capacity</th>"
+        + "<th>Avg. Tasks/Node</th>"
+        + "<th>Blacklisted Nodes</th></tr>\n");
+    out.print("<tr><td><a href=\"machines.jsp?type=active\">"
+        + status.getActiveGroomNames().size() + "</a></td><td>"
+        + status.getMaxTasks() + "</td><td>" + tasksPerNode
+        + "</td><td><a href=\"machines.jsp?type=blacklisted\">" + 0
+        + "</a>" + "</td></tr></table>\n");
+
+    out.print("<br>");
+  }%>
+
+
+<html>
+<head>
+<title><%=trackerName%> Hama BSP Administration</title>
+<!--  <link rel="stylesheet" type="text/css" href="/static/hadoop.css">-->
+</head>
+<body>
+
+<h1><%=trackerName%> Hama BSP Administration</h1>
+
+<b>State:</b>
+<%=status.getBSPMasterState()%><br>
+<b>Started:</b>
+<%=new Date(tracker.getStartTime())%><br>
+<b>Identifier:</b>
+<%=tracker.getBSPMasterIdentifier()%><br>
+
+<hr>
+<%
+  generateSummaryTable(out, status, tracker);
+%>
+<hr />
+<hr>
+
+<h2 id="running_jobs">Running Jobs</h2>
+<%=BSPServletUtil.generateJobTable("Running", runningJobs,
+          30, 0)%>
+<hr> 
+
+<%
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file

Added: incubator/hama/trunk/src/webapps/bspmaster/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/webapps/bspmaster/index.html?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/webapps/bspmaster/index.html (added)
+++ incubator/hama/trunk/src/webapps/bspmaster/index.html Thu Jun  2 12:13:42 2011
@@ -0,0 +1,36 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=bspmaster.jsp"/>
+<html>
+
+<head>
+<title>Hama Administration</title>
+</head>
+
+<body>
+
+<h1>Hama Administration</h1>
+
+<ul>
+
+<li><a href="bspmaster.jsp">BSPMaster</a></li>
+
+</ul>
+
+</body>
+
+</html>

Added: incubator/hama/trunk/src/webapps/bspmaster/machines.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/webapps/bspmaster/machines.jsp?rev=1130510&view=auto
==============================================================================
--- incubator/hama/trunk/src/webapps/bspmaster/machines.jsp (added)
+++ incubator/hama/trunk/src/webapps/bspmaster/machines.jsp Thu Jun  2 12:13:42 2011
@@ -0,0 +1,61 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<%@ page contentType="text/html; charset=UTF-8" import="javax.servlet.*"
+	import="javax.servlet.http.*" import="java.io.*" import="java.util.*"
+	import="java.text.DecimalFormat" import="org.apache.hama.bsp.*"
+	import="org.apache.hama.util.*"%>
+<%!private static final long serialVersionUID = 1L;%>
+<%
+	BSPMaster tracker = (BSPMaster) application
+			.getAttribute("bsp.master");
+	ClusterStatus status = tracker.getClusterStatus(true);
+	String trackerName = tracker.getBSPMasterName();
+	String type = request.getParameter("type");
+%>
+<%!public void generateGroomsTable(JspWriter out, String type,
+			ClusterStatus status, BSPMaster master) throws IOException {
+	out.print("<center>\n");
+    out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+    out.print("<tr><td align=\"center\" colspan=\"6\"><b>Task Trackers</b></td></tr>\n");
+    out.print("<tr><td><b>Name</b></td>" + "<td><b>Host</b></td>"
+        + "<td><b># running tasks</b></td></tr>\n");
+    for (Map.Entry<String, String> entry : status.getActiveGroomNames()
+        .entrySet()) {
+      out.print("<tr><td><a href=\"http://");
+      out.print(entry.getKey() + ":" + master.getHttpPort() + "/\">");
+      out.print(entry.getValue() + "</a></td><td>");
+      out.print(entry.getValue() + "</td>" + "<td>" + 1 + "</td></tr>\n");
+    }
+    out.print("</table>\n");
+    out.print("</center>\n");
+  }%>
+
+<html>
+
+<title><%=trackerName%> Hama Machine List</title>
+
+<body>
+<h1><a href="bspmaster.jsp"><%=trackerName%></a> Hama Machine List</h1>
+
+<h2>Grooms</h2>
+<%
+  generateGroomsTable(out, type, status, tracker);
+%>
+
+<%
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file



Mime
View raw message