incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Date Wed, 20 Oct 2010 01:48:36 GMT
Author: edwardyoon
Date: Wed Oct 20 01:48:36 2010
New Revision: 1024485

URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
Log:
Refactoring launchTask() method in GroomServer

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
@@ -50,6 +50,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
     
+    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
     HAMA-311: Add unit tests for IPC package (edwardyoon)
     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct 20
01:48:36 2010
@@ -54,7 +54,7 @@ public class PiEstimator {
         }
       }
 
-      byte[] tagName = Bytes.toBytes(getName().toString());
+      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
       BSPMessage estimate = new BSPMessage(tagName, myData);
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
@@ -20,20 +20,5 @@ package org.apache.hama.bsp;
 /**
  * This class provides an abstract implementation of the BSP interface
  */
-public abstract class BSP extends Thread implements BSPInterface {
-  private BSPPeer bspPeer;
-  
-  /**
-   * A thread's run method.
-   * 
-   * The run method performs the
-   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
-   */
-  public void runBSP() throws Exception {
-    bsp(bspPeer);
-  }
-  
-  public void setPeer(BSPPeer bspServer) {
-    this.bspPeer = bspServer;
-  }
+public abstract class BSP implements BSPInterface {
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36
2010
@@ -30,6 +30,12 @@ public class BSPMessage implements Writa
   public BSPMessage() {
   }
 
+  /**
+   * Constructor 
+   * 
+   * @param tag of data
+   * @param data of message
+   */
   public BSPMessage(byte[] tag, byte[] data) {
     this.tag = new byte[tag.length];
     this.data = new byte[data.length];
@@ -37,11 +43,20 @@ public class BSPMessage implements Writa
     System.arraycopy(data, 0, this.data, 0, data.length);
   }
 
+  /**
+   * BSP messages are typically identified with tags. This allows to get the tag
+   * of data.
+   * 
+   * @return tag of data of BSP message
+   */
   public byte[] getTag() {
     byte[] result = this.tag;
     return result;
   }
 
+  /**
+   * @return data of BSP message
+   */
   public byte[] getData() {
     byte[] result = this.data;
     return result;

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36 2010
@@ -17,26 +17,21 @@
  */
 package org.apache.hama.bsp;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-
 public class BSPTask extends Task {
-  private BSP bsp;
-  private Configuration conf;
-  
-  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration
conf) {
+
+  public BSPTask() {
+  }
+
+  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.taskId = taskid;
     this.partition = partition;
-    this.conf = conf;
   }
 
-  public BSP getBSPClass() {
-    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
-        BSP.class), conf);
-    
-    return bsp;
+  @Override
+  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
+    return new BSPTaskRunner(this, bspPeer, conf);
   }
 
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36
2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.zookeeper.KeeperException;
+
+public class BSPTaskRunner extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
+  private Task task;
+  private BSPJob conf;
+  private BSPPeer bspPeer;
+
+  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
+    this.task = bspTask;
+    this.conf = conf;
+    this.bspPeer = bspPeer;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  public void run() {
+    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
+        "bsp.work.class", BSP.class), conf.getConf());
+
+    try {
+      bsp.bsp(bspPeer);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (KeeperException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36
2010
@@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
@@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
 public class GroomServer implements Runnable {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
   private static BSPPeer bspPeer;
+  static final String SUBDIR = "groomServer";
 
   Configuration conf;
 
@@ -281,9 +284,111 @@ public class GroomServer implements Runn
     }
 
     try {
+      localizeJob(tip);
+    } catch (Throwable e) {
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
+          .stringifyException(e));
+      LOG.warn(msg);
+    }
+  }
+
+  private void localizeJob(TaskInProgress tip) throws IOException {
+    Task task = tip.getTask();
+    conf.addResource(task.getJobFile());
+    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
+
+    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+        + task.getTaskID() + "/" + "job.xml");
+
+    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
+    BSPJob jobConf = null;
+
+    synchronized (rjob) {
+      if (!rjob.localized) {
+        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
+            + task.getTaskID() + "/" + "job.jar");
+        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
+        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
+
+        HamaConfiguration conf = new HamaConfiguration();
+        conf.addResource(localJobFile);
+        jobConf = new BSPJob(conf, task.getJobID().toString());
+        jobConf.setJar(localJarFile.toString());
+
+        if (jarFile != null) {
+          systemFS.copyToLocalFile(jarFile, localJarFile);
+
+          // also unjar the job.jar files in workdir
+          File workDir = new File(
+              new File(localJobFile.toString()).getParent(), "work");
+          if (!workDir.mkdirs()) {
+            if (!workDir.isDirectory()) {
+              throw new IOException("Mkdirs failed to create "
+                  + workDir.toString());
+            }
+          }
+          RunJar.unJar(new File(localJarFile.toString()), workDir);
+        }
+        rjob.localized = true;
+      }
+    }
+    launchTaskForJob(tip, jobConf);
+  }
+
+  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
+    try {
+      tip.setJobConf(jobConf);
       tip.launchTask();
     } catch (Throwable ie) {
-      // TODO: when job failed.
+      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
+      String error = StringUtils.stringifyException(ie);
+      LOG.info(error);
+    }
+  }
+
+  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
+      TaskInProgress tip) {
+    synchronized (runningJobs) {
+      RunningJob rJob = null;
+      if (!runningJobs.containsKey(jobId)) {
+        rJob = new RunningJob(jobId, localJobFile);
+        rJob.localized = false;
+        rJob.tasks = new HashSet<TaskInProgress>();
+        rJob.jobFile = localJobFile;
+        runningJobs.put(jobId, rJob);
+      } else {
+        rJob = runningJobs.get(jobId);
+      }
+      rJob.tasks.add(tip);
+      return rJob;
+    }
+  }
+
+  /**
+   * The datastructure for initializing a job
+   */
+  static class RunningJob {
+    private BSPJobID jobid;
+    private Path jobFile;
+    // keep this for later use
+    Set<TaskInProgress> tasks;
+    boolean localized;
+    boolean keepJobFiles;
+
+    RunningJob(BSPJobID jobid, Path jobFile) {
+      this.jobid = jobid;
+      localized = false;
+      tasks = new HashSet<TaskInProgress>();
+      this.jobFile = jobFile;
+      keepJobFiles = false;
+    }
+
+    Path getJobFile() {
+      return jobFile;
+    }
+
+    BSPJobID getJobId() {
+      return jobid;
     }
   }
 
@@ -410,6 +515,8 @@ public class GroomServer implements Runn
   // /////////////////////////////////////////////////////
   class TaskInProgress {
     Task task;
+    BSPJob jobConf;
+    private BSPTaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
     private TaskStatus taskStatus;
@@ -421,61 +528,29 @@ public class GroomServer implements Runn
           TaskStatus.Phase.STARTING);
     }
 
-    static final String SUBDIR = "groomServer";
+    public void setJobConf(BSPJob jobConf) {
+      this.jobConf = jobConf;
+    }
 
-    public void launchTask() {
+    public void launchTask() throws IOException {
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      this.runner = task.createRunner(bspPeer, this.jobConf);
+      this.runner.start();
 
-      try {
-        // TODO: need to move this code to TaskRunner
-
-        task.getJobFile();
-        conf.addResource(task.getJobFile());
-        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
-
-        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
-            + task.getTaskID() + "/" + "job.xml");
-        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
-            + task.getTaskID() + "/" + "job.jar");
-
-        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
-        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
-            ".jar")), localJarFile);
-
-        HamaConfiguration conf = new HamaConfiguration();
-        conf.addResource(localJobFile);
-        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
-        jobConf.setJar(localJarFile.toString());
-
-        BSP bsp = (BSP) ReflectionUtils
-            .newInstance(jobConf.getBspClass(), conf);
-        bsp.setPeer(bspPeer);
+      // Check state of Task
+      while (true) {
         try {
-          bsp.runBSP();
-        } catch (Exception e) {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
           e.printStackTrace();
-          taskStatus.setRunState(TaskStatus.State.FAILED);
         }
 
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      } finally {
-
-        while (true) {
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-
-          // If local/outgoing queues are empty, task is done.
-          if (bspPeer.localQueue.size() == 0
-              && bspPeer.outgoingQueues.size() == 0) {
-            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-            acceptNewTasks = true;
-            break;
-          }
+        // If local/outgoing queues are empty, task is done.
+        if (bspPeer.localQueue.size() == 0
+            && bspPeer.outgoingQueues.size() == 0) {
+          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          acceptNewTasks = true;
+          break;
         }
       }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20 01:48:36
2010
@@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
   }
 
   public void readFields(DataInput in) throws IOException {
-    task = new Task();
+    task = new BSPTask();
     task.readFields(in);
   }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20 01:48:36
2010
@@ -172,7 +172,7 @@ public class LocalJobRunner implements J
 
           try {
             GroomServer servers = new GroomServer(conf);
-            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i, this.conf);
+            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
             
             // TODO not yet implemented
             

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
@@ -23,14 +23,12 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-/**
- *
- */
-public class Task implements Writable {
+public abstract class Task implements Writable {
   public static final Log LOG = LogFactory.getLog(Task.class);
   ////////////////////////////////////////////
   // Fields
@@ -109,5 +107,7 @@ public class Task implements Writable {
     taskId = Text.readString(in);
     partition = in.readInt();
   }
+
+  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
   
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20 01:48:36
2010
@@ -101,7 +101,7 @@ class TaskInProgress {
         return null;
       }
 
-      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
+      t = new BSPTask(jobId, jobFile, taskid, partition);
       activeTasks.put(taskid, status.getGroomName());
 
       // Ask JobTracker to note that the task exists



Mime
View raw message