incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1058109 [2/2] - in /incubator/hama/trunk: ./ conf/ src/java/org/apache/hama/ src/java/org/apache/hama/bsp/ src/java/org/apache/hama/ipc/ src/test/org/apache/hama/ src/test/org/apache/hama/bsp/
Date Wed, 12 Jan 2011 12:32:42 GMT
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskAttemptID.java Wed Jan 12 12:32:39
2011
@@ -48,6 +48,10 @@ public class TaskAttemptID extends ID {
   public BSPJobID getJobID() {
     return taskId.getJobID();
   }
+ 
+  public TaskID getTaskId(){
+    return taskId;
+  }
 
   public TaskID getTaskID() {
     return taskId;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Jan 12 12:32:39
2011
@@ -39,7 +39,7 @@ class TaskInProgress {
   int maxTaskAttempts = 4;
   private boolean failed = false;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
-  
+
   // Job Meta
   private String jobFile = null;
   private int partition;
@@ -74,6 +74,21 @@ class TaskInProgress {
 
   private BSPJobID jobId;
 
+  /**
+   * Constructor for new nexus between BSPMaster and GroomServer.
+   * 
+   * @param jobId is identification of JobInProgress.
+   * @param jobFile the path of job file
+   * @param partition which partition this TaskInProgress owns.
+   */
+  public TaskInProgress(BSPJobID jobId, String jobFile, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.partition = partition;
+
+    this.id = new TaskID(jobId, true, partition);
+  }
+
   public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
       Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
@@ -94,9 +109,9 @@ class TaskInProgress {
 
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
-      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
-      taskid = new TaskAttemptID( id, attemptId);
-      
+      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
+          + nextTaskId;
+      taskid = new TaskAttemptID(id, attemptId);
       ++nextTaskId;
     } else {
       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
@@ -107,8 +122,6 @@ class TaskInProgress {
     t = new BSPTask(jobId, jobFile, taskid, partition);
     activeTasks.put(taskid, status.getGroomName());
 
-    // Ask BSPMaster to note that the task exists
-    bspMaster.createTaskEntry(taskid, status.getGroomName(), this);
     return t;
   }
 
@@ -133,6 +146,10 @@ class TaskInProgress {
     return id;
   }
 
+  public TaskID getTaskId() {
+    return this.id;
+  }
+
   public TreeMap<TaskAttemptID, String> getTasks() {
     return activeTasks;
   }
@@ -164,7 +181,7 @@ class TaskInProgress {
   public synchronized boolean isComplete() {
     return (completes > 0);
   }
-  
+
   /**
    * Is the given taskid the one that took this tip to completion?
    * 
@@ -190,14 +207,14 @@ class TaskInProgress {
 
   public void completed(TaskAttemptID taskid) {
     LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
-    
+
     TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
     status.setRunState(TaskStatus.State.SUCCEEDED);
     activeTasks.remove(taskid);
 
     // Note the successful taskid
     setSuccessfulTaskid(taskid);
-    
+
     //
     // Now that the TIP is complete, the other speculative
     // subtasks will be closed when the owning groom server
@@ -208,13 +225,13 @@ class TaskInProgress {
   }
 
   private void setSuccessfulTaskid(TaskAttemptID taskid) {
-    this.successfulTaskId = taskid; 
+    this.successfulTaskId = taskid;
   }
 
   private TaskAttemptID getSuccessfulTaskid() {
     return successfulTaskId;
   }
-  
+
   public void updateStatus(TaskStatus status) {
     taskStatuses.put(status.getTaskId(), status);
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskScheduler.java Wed Jan 12 12:32:39
2011
@@ -19,7 +19,6 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -65,23 +64,14 @@ abstract class TaskScheduler implements 
     // do nothing
   }
 
-  public abstract void addJob(JobInProgress job);
+  // public abstract void addJob(JobInProgress job);
 
   /**
    * Returns a collection of jobs in an order which is specific to the
    * particular scheduler.
    * 
-   * @param queueName
-   * @return
+   * @param Queue name.
+   * @return JobInProgress corresponded to the specified queue.
    */
-  public abstract Collection<JobInProgress> getJobs();
-
-  /**
-   * Returns the tasks we'd like the GroomServer to execute right now.
-   * 
-   * @param groomServer The GroomServer for which we're looking for tasks.
-   * @return A list of tasks to run on that GroomServer, possibly empty.
-   */
-  public abstract List<Task> assignTasks(GroomServerStatus groomStatus)
-      throws IOException;
+  public abstract Collection<JobInProgress> getJobs(String queue);
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskStatus.java Wed Jan 12 12:32:39
2011
@@ -40,13 +40,14 @@ class TaskStatus implements Writable, Cl
     RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
   }
 
+  private BSPJobID jobId;
   private TaskAttemptID taskId;
   private float progress;
   private volatile State runState;
   private String stateString;
   private String groomServer;
   private long superstepCount;
-  
+
   private long startTime;
   private long finishTime;
 
@@ -56,12 +57,14 @@ class TaskStatus implements Writable, Cl
    * 
    */
   public TaskStatus() {
+    jobId = new BSPJobID();
     taskId = new TaskAttemptID();
     this.superstepCount = 0;
   }
 
-  public TaskStatus(TaskAttemptID taskId, float progress, State runState,
-      String stateString, String groomServer, Phase phase) {
+  public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
+      State runState, String stateString, String groomServer, Phase phase) {
+    this.jobId = jobId;
     this.taskId = taskId;
     this.progress = progress;
     this.runState = runState;
@@ -75,6 +78,10 @@ class TaskStatus implements Writable, Cl
   // Accessors and Modifiers
   // //////////////////////////////////////////////////
 
+  public BSPJobID getJobId() {
+    return jobId;
+  }
+
   public TaskAttemptID getTaskId() {
     return taskId;
   }
@@ -211,14 +218,14 @@ class TaskStatus implements Writable, Cl
       this.finishTime = finishTime;
     }
   }
-  
+
   /**
    * @return The number of BSP super steps executed by the task.
    */
   public long getSuperstepCount() {
     return superstepCount;
   }
-  
+
   /**
    * Increments the number of BSP super steps executed by the task.
    */
@@ -242,6 +249,7 @@ class TaskStatus implements Writable, Cl
 
   @Override
   public void readFields(DataInput in) throws IOException {
+    this.jobId.readFields(in);
     this.taskId.readFields(in);
     this.progress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
@@ -254,6 +262,7 @@ class TaskStatus implements Writable, Cl
 
   @Override
   public void write(DataOutput out) throws IOException {
+    jobId.write(out);
     taskId.write(out);
     out.writeFloat(progress);
     WritableUtils.writeEnum(out, runState);

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/MasterProtocol.java Wed Jan 12 12:32:39
2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A new protocol for GroomServers communicate with BSPMaster. This
+ * protocol paired with WorkerProtocl, let GroomServers enrol with 
+ * BSPMaster, so that BSPMaster can dispatch tasks to GroomServers.
+ */
+public interface MasterProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * A GroomServer register with its status to BSPMaster, which will update
+   * GroomServers cache.
+   *
+   * @param status to be updated in cache.
+   * @return true if successfully register with BSPMaster; false if fail.
+   */
+  boolean register(GroomServerStatus status) throws IOException;
+
+  /**
+   * A GroomServer (periodically) reports task statuses back to the BSPMaster.
+   * @param directive 
+   */
+  boolean report(Directive directive) throws IOException;
+
+  public String getSystemDir();  
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/ipc/WorkerProtocol.java Wed Jan 12 12:32:39
2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A protocol for BSPMaster talks to GroomServer. This protocol 
+ * allow BSPMaster dispatch tasks to a GroomServer.
+ */
+public interface WorkerProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Instruct GroomServer performaning tasks.
+   * 
+   * @param directive instructs a GroomServer performing necessary
+   *        execution.
+   * @throws IOException
+   */
+  void dispatch(Directive directive) throws IOException;
+
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaCluster.java Wed Jan 12 12:32:39 2011
@@ -29,6 +29,22 @@ public abstract class HamaCluster extend
   public static final Log LOG = LogFactory.getLog(HamaCluster.class);
   protected final static HamaConfiguration conf = new HamaConfiguration();
 
+  public HamaCluster(){
+    super();
+  }
+
+  public HamaCluster(int groomServers) {
+    this(groomServers, true, 10);
+  }
+
+  public HamaCluster(int groomServers, int threadpool) {
+    this(groomServers, true, threadpool);
+  }
+
+  public HamaCluster(int groomServers, boolean startDfs, int threadpool) {
+    super(groomServers, startDfs, threadpool);
+  }
+
   protected void setUp() throws Exception {
     super.setUp();
   }

Modified: incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/HamaClusterTestCase.java Wed Jan 12 12:32:39
2011
@@ -20,6 +20,9 @@ package org.apache.hama;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,6 +30,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.util.ClusterUtil;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
 
 public abstract class HamaClusterTestCase extends HamaTestCase {
   public static final Log LOG = LogFactory.getLog(HamaClusterTestCase.class);
@@ -42,13 +48,18 @@ public abstract class HamaClusterTestCas
   }
 
   public HamaClusterTestCase(int groomServers) {
-    this(groomServers, true);
+    this(groomServers, true, 10);
   }
 
-  public HamaClusterTestCase(int groomServers, boolean startDfs) {
+  public HamaClusterTestCase(int groomServers, boolean startDfs, int threadpool) {
     super();
     this.startDfs = startDfs;
     this.groomServers = groomServers;
+    conf.setInt("bsp.test.threadpool", threadpool);
+  }
+
+  public MiniBSPCluster getCluster(){
+    return this.cluster;
   }
 
   /**
@@ -90,9 +101,7 @@ public abstract class HamaClusterTestCas
 
       // start the instance
       hamaClusterSetup();
-
     } catch (Exception e) {
-      LOG.error("Exception in setup!", e);
       if (cluster != null) {
         cluster.shutdown();
       }
@@ -112,6 +121,7 @@ public abstract class HamaClusterTestCas
     try {
       if (this.cluster != null) {
         try {
+          
           this.cluster.shutdown();
         } catch (Exception e) {
           LOG.warn("Closing mini dfs", e);

Modified: incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/MiniBSPCluster.java Wed Jan 12 12:32:39
2011
@@ -17,29 +17,222 @@
  */
 package org.apache.hama;
 
-import java.util.List;
+import java.io.IOException;
 
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
+
+import static junit.framework.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.BSPMaster;
+import org.apache.hama.bsp.GroomServer;
 import org.apache.hama.HamaConfiguration;
 
+
 public class MiniBSPCluster {
 
+  public static final Log LOG = LogFactory.getLog(MiniBSPCluster.class);
+
+  private ScheduledExecutorService scheduler;
+
+  private HamaConfiguration configuration;
+  private BSPMasterRunner master;
+  private List<GroomServerRunner> groomServerList = 
+    new CopyOnWriteArrayList<GroomServerRunner>();
+  private int grooms;
+
+  public class BSPMasterRunner implements Runnable{
+    BSPMaster bspm;
+    HamaConfiguration conf;
+
+    public BSPMasterRunner(HamaConfiguration conf){
+      this.conf = conf;
+      if(null == this.conf) 
+        throw new NullPointerException("No Configuration for BSPMaster.");
+    }  
+
+    public void run(){
+      try{
+        LOG.info("Starting BSP Master.");
+        this.bspm = BSPMaster.startMaster(this.conf); 
+        this.bspm.offerService();
+      }catch(IOException ioe){
+        LOG.error("Fail to startup BSP Master.", ioe);
+      }catch(InterruptedException ie){
+        LOG.error("BSP Master fails in offerService().", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      if(null != this.bspm) this.bspm.shutdown();
+    }
+
+    public boolean isRunning(){
+      if(null == this.bspm) return false;
+
+      if(this.bspm.currentState().equals(BSPMaster.State.RUNNING)){
+        return true;
+      } 
+      return false;
+    }
+
+    public BSPMaster getMaster(){
+      return this.bspm;
+    }
+  }
+
+  public class GroomServerRunner implements Runnable{
+    GroomServer gs;
+    HamaConfiguration conf;
+
+    public GroomServerRunner(HamaConfiguration conf){
+      this.conf = conf;
+    }
+ 
+    public void run(){
+      try{
+        this.gs = GroomServer.constructGroomServer(GroomServer.class, conf);
+        GroomServer.startGroomServer(this.gs).join();
+      }catch(InterruptedException ie){
+        LOG.error("Fail to start GroomServer. ", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    public void shutdown(){
+      try{
+        if(null != this.gs) this.gs.shutdown();
+      }catch(IOException ioe){
+        LOG.info("Fail to shutdown GroomServer.", ioe);
+      }
+    }
+    
+    public boolean isRunning(){
+      if(null == this.gs) return false;
+      return this.gs.isRunning(); 
+    }
+
+    public GroomServer getGroomServer(){
+      return this.gs;
+    }
+  }
+
   public MiniBSPCluster(HamaConfiguration conf, int groomServers) {
-    // TODO Auto-generated constructor stub
+    this.configuration = conf;
+    this.grooms = groomServers;
+    if(1 > this.grooms) {
+      this.grooms = 2;  
+    }
+    LOG.info("Groom server number "+this.grooms);
+    int threadpool = conf.getInt("bsp.test.threadpool", 10);
+    LOG.info("Thread pool value "+threadpool);
+    scheduler = Executors.newScheduledThreadPool(threadpool);
+
+    startMaster();
+    startGroomServers();
+  }
+
+  public void startMaster(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    this.master = new BSPMasterRunner(this.configuration);
+    scheduler.schedule(this.master, 0, SECONDS);
+  }
+
+  public void startGroomServers(){
+    if(null == this.scheduler) 
+      throw new NullPointerException("No ScheduledExecutorService exists.");
+    if(null == this.master) 
+      throw new NullPointerException("No BSPMaster exists.");
+    int cnt=0;
+    while(!this.master.isRunning()){
+      LOG.info("Waiting BSPMaster up.");
+      try{
+        Thread.sleep(1000);
+        cnt++;
+        if(10 < cnt){
+          fail("Fail to launch BSPMaster.");
+        }
+      }catch(InterruptedException ie){
+        LOG.error("Fail to check BSP Master's state.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+    for(int i=0; i < this.grooms; i++){
+      HamaConfiguration c = new HamaConfiguration(this.configuration);
+      randomPort(c);
+      GroomServerRunner gsr = new GroomServerRunner(c);
+      groomServerList.add(gsr);
+      scheduler.schedule(gsr, 0, SECONDS);
+      cnt = 0;
+      while(!gsr.isRunning()){
+        LOG.info("Waitin for GroomServer up.");
+        try{
+          Thread.sleep(1000);
+          cnt++;
+          if(10 < cnt){
+            fail("Fail to launch groom server.");
+          }
+        }catch(InterruptedException ie){
+          LOG.error("Fail to check Groom Server's state.", ie);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+  }
+
+  private void randomPort(HamaConfiguration conf){
+    try{
+      ServerSocket skt = new ServerSocket(0);
+      int p = skt.getLocalPort(); 
+      skt.close();
+      conf.set(Constants.PEER_PORT, new Integer(p).toString());
+      conf.setInt(Constants.GROOM_RPC_PORT, p+100);
+    }catch(IOException ioe){
+      LOG.error("Can not find a free port for BSPPeer.", ioe);
+    }
   }
 
   public void shutdown() {
-    // TODO Auto-generated method stub
-    
+    scheduler.shutdown();
   }
 
   public List<Thread> getGroomServerThreads() {
-    // TODO Auto-generated method stub
-    return null;
+    List<Thread> list = new ArrayList<Thread>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(new Thread(gsr));
+    }
+    return list;
   }
 
   public Thread getMaster() {
-    // TODO Auto-generated method stub
+    return new Thread(this.master);
+  }
+
+  public List<GroomServer> getGroomServers(){
+    List<GroomServer> list = new ArrayList<GroomServer>();
+    for(GroomServerRunner gsr: groomServerList){
+      list.add(gsr.getGroomServer());
+    }
+    return list;
+  }
+
+  public BSPMaster getBSPMaster(){
+    if(null != this.master)
+      return this.master.getMaster();
     return null;
   }
 
+  public ScheduledExecutorService getScheduler(){
+    return this.scheduler;
+  }
 }

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/Client.java Wed Jan 12 12:32:39 2011
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.HamaConfiguration;
+import org.apache.zookeeper.KeeperException;
+
+class Client implements Runnable{
+  public static final Log LOG = LogFactory.getLog(Client.class);
+  private final static List<Result> collector = Collections.synchronizedList(new ArrayList<Result>());

+ 
+  public static final class Result{
+    private final int num;
+    private final int order;
+    private final String peer;
+    private final String content;
+
+    public Result(final int num, final int order, final String peer, 
+                  final String content){
+      this.num = num;
+      this.order = order;
+      this.peer = peer;
+      this.content = content;
+    }
+
+    public String getContent(){
+      return this.content;
+    }
+  
+    public int getOrder(){
+      return this.order; 
+    }
+  
+    public int getNumber(){
+      return this.num;
+    }
+  
+    public String getPeer(){
+      return this.peer;
+    }
+
+    public String toString(){
+      return " number:"+getNumber() + " order:"+getOrder()+
+             " peer:"+getPeer()+" content:"+getContent();
+    }
+    
+  }
+
+  public static class HelloBSP extends BSP {
+    private Configuration conf;
+
+    @Override
+    public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
+        InterruptedException {
+      int cnt = 0;
+      Result r = null;
+      LOG.info("BSPPeer all peer names -> "+bspPeer.getAllPeerNames());
+      LOG.info("Current peer -> "+bspPeer.getPeerName());
+      for (String otherPeer : bspPeer.getAllPeerNames()) {
+        if (bspPeer.getPeerName().equals(otherPeer)) {
+          int num = Integer.parseInt(conf.get("bsp.peers.num"));
+          String result = "Hello BSP from " + (cnt + 1) + " of " + num + ": "
+              + bspPeer.getPeerName();
+          r = new Result(num, cnt, bspPeer.getPeerName(), result);
+          LOG.info("(Targeted server) result object -> "+r);
+          collector.add(r);
+        }
+        cnt++;
+      }// for
+      bspPeer.sync();
+    }  
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+  }
+
+  public List<Result> getResults(){
+    return collector;
+  }
+
+  @Override
+  public void run(){
+    try{
+      HamaConfiguration c1 = new HamaConfiguration();
+      BSPJob bsp = new BSPJob(c1, Client.class);
+      bsp.setJobName("Hello BSP");
+      bsp.setBspClass(HelloBSP.class);
+      String hamahome = System.getenv("HAMA_HOME");
+      if(null == hamahome || "".equals(hamahome)){
+        hamahome = System.getProperty("user.dir");
+      }
+      LOG.info("HAMA_HOME:"+hamahome);
+      // e.g. hama-0.2.0-dev-test.jar
+      File dir = new File(hamahome+"/build/");
+      for(String file: dir.list()){
+        boolean flag = file.matches("hama.*test.jar");
+        if(flag){ 
+          LOG.info("Jar file "+file);
+          bsp.setJar(hamahome+"/build/"+file);  // TODO: should not hardcoded!!!
+        }
+      }
+      BSPJobClient client = new BSPJobClient(c1);
+      ClusterStatus cluster = client.getClusterStatus(false);
+      int groomsize = cluster.getGroomServers();
+      assertEquals("Check if GroomServer number matches.", groomsize, TestBSPMaster.GROOM_SIZE);
+      bsp.setNumBspTask(groomsize);
+      BSPJobClient.runJob(bsp);
+    }catch(IOException ioe){
+      LOG.info("Error submitting job.", ioe);
+    }
+  } 
+}
+

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMaster.java Wed Jan 12 12:32:39
2011
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestBSPMaster extends HamaCluster {
+
+  private Log LOG = LogFactory.getLog(TestBSPMaster.class);
+
+  private HamaConfiguration conf;
+  private BSPMaster bspm;
+  private List<GroomServer> groomServers;
+  public static final int GROOM_SIZE = 2;
+  public static final int THREAD_POOL = 20;
+  //public static final int DFS_PORT = 9000;
+
+  public TestBSPMaster() throws Exception{
+    super(GROOM_SIZE, THREAD_POOL); 
+    this.conf = getConf();
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    this.bspm = getCluster().getBSPMaster();
+    this.groomServers = getCluster().getGroomServers();
+    assertNotNull("Ensure BSP Master instance exists.", bspm);
+    assertNotNull("Ensure GroomServers exist.", this.groomServers);
+    int c=0;
+    while(GROOM_SIZE > this.bspm.groomServerStatusKeySet().size()){
+      LOG.info("Waiting for GroomServer registering to BSPMaster."); 
+      try{
+        Thread.sleep(1000); 
+        c++;
+        if(10<c){
+          fail("Waiting too long for GroomServers' registeration. ");
+        }
+      }catch(InterruptedException e){
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static String currentAddress() throws UnknownHostException{
+    InetAddress addr = InetAddress.getLocalHost();
+    return addr.getHostAddress();
+  }
+
+  private boolean contains(String nic){
+    for(GroomServerStatus sts: this.bspm.groomServerStatusKeySet()){
+      if(sts.getPeerName().equals(nic)){
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void testBSPMasterGroomServerNexus() throws Exception{
+    // registeration
+    assertTrue("Assert GroomServer exists.", 
+               this.bspm.groomServerStatusKeySet().size() > 0);
+    for(GroomServer groom: this.groomServers){
+      String nic = groom.getBspPeerName();
+      assertTrue("Check if registered groom server exists.", contains(nic));
+    }
+
+    final ScheduledExecutorService sched = getCluster().getScheduler();
+    LOG.info("Start submiting job ...");
+
+    // client submit job 
+    Client c = new Client();
+    sched.schedule(c, 0, SECONDS);
+    int cnt = 0;
+    while(c.getResults().isEmpty()) {
+      try{
+        Thread.sleep(1000);
+        cnt++;
+        if(10 < cnt){
+          fail("Can not get client submitted job result.");
+        }
+      }catch(InterruptedException ie){
+        LOG.error("Fail to check client result.", ie);
+        Thread.currentThread().interrupt();
+      }
+    }
+    List<Client.Result> results = c.getResults();
+    LOG.info("Task results size:"+results.size());
+    assertEquals("Ensure return collection size is 2.", results.size(), GROOM_SIZE);
+    for(Client.Result r: results){
+      LOG.info("Collected result => "+r);
+      assertEquals("Check result.", r.getContent(), 
+                   "Hello BSP from " + (r.getOrder() + 1) + " of " +  
+                   r.getNumber() + ": " + r.getPeer());
+    }
+    LOG.info("Finish executing test nexus method.");
+  }
+
+  public void tearDown() throws Exception{
+    super.tearDown();
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java?rev=1058109&r1=1058108&r2=1058109&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPPeer.java Wed Jan 12 12:32:39
2011
@@ -47,7 +47,7 @@ import org.apache.zookeeper.data.Stat;
 public class TestBSPPeer extends HamaCluster implements Watcher {
   private Log LOG = LogFactory.getLog(TestBSPPeer.class);
 
-  private static final int NUM_PEER = 20;
+  private static final int NUM_PEER = 10;
   private static final int ROUND = 3;
   private static final int PAYLOAD = 1024; // 1kb in default
   List<BSPPeerThread> list = new ArrayList<BSPPeerThread>(NUM_PEER);
@@ -96,7 +96,8 @@ public class TestBSPPeer extends HamaClu
         peerNames.add("localhost:" + (30000 + i));
       }
       peer.setAllPeerNames(peerNames);
-      TaskStatus currentTaskStatus = new TaskStatus(new TaskAttemptID(), 0, null, null, null,
null);
+      TaskStatus currentTaskStatus = new TaskStatus(new BSPJobID(), 
+          new TaskAttemptID(), 0, null, null, null, null);
       peer.setCurrentTaskStatus(currentTaskStatus);
       BSPJob jobConf = new BSPJob(conf, NUM_PEER);
       peer.setJobConf((BSPJob) jobConf);
@@ -150,7 +151,7 @@ public class TestBSPPeer extends HamaClu
           + " messages at " + round + " round");
 
       if (lastTwoDigitsOfPort < 10) {
-        assertEquals(20, numMessages);
+        assertEquals(10, numMessages);
       } else {
         assertEquals(0, numMessages);
       }

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java?rev=1058109&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestDirective.java Wed Jan 12 12:32:39
2011
@@ -0,0 +1,172 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+
+public class TestDirective extends HamaCluster {
+  private Log LOG = LogFactory.getLog(TestDirective.class);
+
+  HamaConfiguration conf;
+
+  public TestDirective() {
+    this.conf = getConf();
+  }
+
+  public void setUp() throws Exception {
+  }
+
+  public void testRequest() throws Exception{
+    BSPJobID jobId = new BSPJobID();
+    TaskID taskId = new TaskID(jobId, true, 0);
+    TaskAttemptID attemptId = new TaskAttemptID(taskId, 0);
+    GroomServerAction[] actions = new GroomServerAction[]{ 
+      new LaunchTaskAction(new BSPTask(jobId, "/path/to/jobFile", attemptId,0))};
+    Map<String, String> groomServerPeers = new HashMap<String, String>();
+    groomServerPeers.put("groomServer1", "192.168.0.22:8080");
+    groomServerPeers.put("groomServer2", "192.168.0.23:8081");
+    groomServerPeers.put("groomServer3", "192.168.0.24:8082");
+    groomServerPeers.put("groomServer4", "192.168.0.25:8083");
+    groomServerPeers.put("groomServer5", "192.168.0.26:8084");
+
+    Directive w = new Directive(groomServerPeers, actions);
+
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    assertEquals("Check if data is serialized to output stream.", 
+                 bout.size(), 0);
+
+    DataOutput out = new DataOutputStream(bout);
+    w.write(out);
+   
+    assertTrue("Check if data is serialized to output stream.", 
+               bout.size() > 0 );
+  
+    byte[] bytes = bout.toByteArray();
+
+    ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
+    DataInput in = new DataInputStream(bin);
+    
+    Directive r = new Directive();
+    r.readFields(in);
+
+    assertEquals("Check the reqeust type.", Directive.Type.Request.value(),
+                 r.getType().value()); 
+
+    Map<String, String> peers = r.getGroomServerPeers();
+    assertEquals("Check groom server peers content.", 
+                 peers.get("groomServer1"), 
+                 groomServerPeers.get("groomServer1"));
+    assertEquals("Check groom server peers content.", 
+                 peers.get("groomServer2"), 
+                 groomServerPeers.get("groomServer2"));
+    assertEquals("Check groom server peers content.", 
+                 peers.get("groomServer3"), 
+                 groomServerPeers.get("groomServer3"));
+    assertEquals("Check groom server peers content.", 
+                 peers.get("groomServer4"), 
+                 groomServerPeers.get("groomServer4"));
+    assertEquals("Check groom server peers content.", 
+                 peers.get("groomServer5"), 
+                 groomServerPeers.get("groomServer5"));
+
+    GroomServerAction[] as = r.getActions();
+    assertEquals("Check GroomServerAction size.", as.length, actions.length);
+
+    assertTrue("Check GroomServerAction type.", 
+               as[0] instanceof LaunchTaskAction);
+
+    Task t = ((LaunchTaskAction)as[0]).getTask();
+
+    assertEquals("Check action's bsp job id.", t.getJobID(), jobId);
+
+    assertEquals("Check action's job file.", 
+                 t.getJobFile(), "/path/to/jobFile");
+
+    assertEquals("Check action's partition.", t.getPartition(), 0);
+
+  }
+
+  public void testResponse() throws Exception{
+
+    BSPJobID jobId = new BSPJobID();
+    TaskID taskId = new TaskID(jobId, true, 0);
+    TaskAttemptID attemptId = new TaskAttemptID(taskId, 0);
+
+    List<TaskStatus> tasks = new ArrayList<TaskStatus>();
+    tasks.add(new TaskStatus(jobId, attemptId, 1f, 
+      TaskStatus.State.SUCCEEDED, "", "groomServer1", 
+      TaskStatus.Phase.CLEANUP));
+    GroomServerStatus status = new GroomServerStatus("groomServer1", 
+      "192.168.1.111:2123", tasks, 1, 4);
+    Directive w = new Directive(status);
+    assertEquals("Check directive type is correct.", 
+                 w.getType().value(), Directive.Type.Response.value());
+
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bout);
+    w.write(out);
+
+    assertTrue("Check if output has data.", bout.size() > 0); 
+
+    byte[] bytes = bout.toByteArray();
+
+    ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
+    DataInput in = new DataInputStream(bin);
+
+    Directive r = new Directive();
+    r.readFields(in);
+
+    assertEquals("Check directive type is correct.", 
+                 r.getType().value(), w.getType().value());
+
+    assertEquals("Check groom server status' groom name.",
+                 r.getStatus().getGroomName(), "groomServer1");
+    
+    assertEquals("Check groom server status' peer name.",
+                 r.getStatus().getPeerName(), "192.168.1.111:2123");
+    TaskStatus t = r.getStatus().getTaskReports().get(0);
+
+    assertEquals("Check tasks status' job id.", t.getJobId(), jobId);
+    
+    assertEquals("Check task status' run state.",
+                 t.getRunState(), TaskStatus.State.SUCCEEDED);
+
+    assertEquals("Check task status'  state.",
+                 t.getPhase(), TaskStatus.Phase.CLEANUP);
+    
+  }
+
+  public void tearDown() throws Exception{
+  }
+}



Mime
View raw message