incubator-hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Filipe David Manana <fdman...@apache.org>
Subject Re: svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Date Wed, 20 Oct 2010 09:31:19 GMT
A big +1 on this one :)

On Wed, Oct 20, 2010 at 2:48 AM,  <edwardyoon@apache.org> wrote:
> Author: edwardyoon
> Date: Wed Oct 20 01:48:36 2010
> New Revision: 1024485
>
> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
> Log:
> Refactoring launchTask() method in GroomServer
>
> Added:
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> Modified:
>    incubator/hama/trunk/CHANGES.txt
>    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>
> Modified: incubator/hama/trunk/CHANGES.txt
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/CHANGES.txt (original)
> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>
>   IMPROVEMENTS
>
> +    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>     HAMA-311: Add unit tests for IPC package (edwardyoon)
>     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>
> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java (original)
> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java Wed Oct
20 01:48:36 2010
> @@ -54,7 +54,7 @@ public class PiEstimator {
>         }
>       }
>
> -      byte[] tagName = Bytes.toBytes(getName().toString());
> +      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>       BSPMessage estimate = new BSPMessage(tagName, myData);
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36 2010
> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>  /**
>  * This class provides an abstract implementation of the BSP interface
>  */
> -public abstract class BSP extends Thread implements BSPInterface {
> -  private BSPPeer bspPeer;
> -
> -  /**
> -   * A thread's run method.
> -   *
> -   * The run method performs the
> -   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
> -   */
> -  public void runBSP() throws Exception {
> -    bsp(bspPeer);
> -  }
> -
> -  public void setPeer(BSPPeer bspServer) {
> -    this.bspPeer = bspServer;
> -  }
> +public abstract class BSP implements BSPInterface {
>  }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct 20 01:48:36
2010
> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>   public BSPMessage() {
>   }
>
> +  /**
> +   * Constructor
> +   *
> +   * @param tag of data
> +   * @param data of message
> +   */
>   public BSPMessage(byte[] tag, byte[] data) {
>     this.tag = new byte[tag.length];
>     this.data = new byte[data.length];
> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>     System.arraycopy(data, 0, this.data, 0, data.length);
>   }
>
> +  /**
> +   * BSP messages are typically identified with tags. This allows to get the tag
> +   * of data.
> +   *
> +   * @return tag of data of BSP message
> +   */
>   public byte[] getTag() {
>     byte[] result = this.tag;
>     return result;
>   }
>
> +  /**
> +   * @return data of BSP message
> +   */
>   public byte[] getData() {
>     byte[] result = this.data;
>     return result;
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20 01:48:36
2010
> @@ -17,26 +17,21 @@
>  */
>  package org.apache.hama.bsp;
>
> -import org.apache.hadoop.conf.Configuration;
> -import org.apache.hadoop.util.ReflectionUtils;
> -
>  public class BSPTask extends Task {
> -  private BSP bsp;
> -  private Configuration conf;
> -
> -  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition, Configuration
conf) {
> +
> +  public BSPTask() {
> +  }
> +
> +  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition) {
>     this.jobId = jobId;
>     this.jobFile = jobFile;
>     this.taskId = taskid;
>     this.partition = partition;
> -    this.conf = conf;
>   }
>
> -  public BSP getBSPClass() {
> -    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
> -        BSP.class), conf);
> -
> -    return bsp;
> +  @Override
> +  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
> +    return new BSPTaskRunner(this, bspPeer, conf);
>   }
>
>  }
>
> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed Oct 20 01:48:36
2010
> @@ -0,0 +1,62 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.hama.bsp;
> +
> +import java.io.IOException;
> +
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.zookeeper.KeeperException;
> +
> +public class BSPTaskRunner extends Thread {
> +
> +  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
> +  private Task task;
> +  private BSPJob conf;
> +  private BSPPeer bspPeer;
> +
> +  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
> +    this.task = bspTask;
> +    this.conf = conf;
> +    this.bspPeer = bspPeer;
> +  }
> +
> +  public Task getTask() {
> +    return task;
> +  }
> +
> +  public void run() {
> +    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
> +        "bsp.work.class", BSP.class), conf.getConf());
> +
> +    try {
> +      bsp.bsp(bspPeer);
> +    } catch (IOException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    } catch (KeeperException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    } catch (InterruptedException e) {
> +      // TODO Auto-generated catch block
> +      e.printStackTrace();
> +    }
> +  }
> +
> +}
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct 20 01:48:36
2010
> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>  import java.net.InetSocketAddress;
>  import java.util.ArrayList;
>  import java.util.HashMap;
> +import java.util.HashSet;
>  import java.util.LinkedHashMap;
>  import java.util.List;
>  import java.util.Map;
> +import java.util.Set;
>  import java.util.TreeMap;
>  import java.util.concurrent.BlockingQueue;
>  import java.util.concurrent.LinkedBlockingQueue;
> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>  import org.apache.hadoop.ipc.RemoteException;
>  import org.apache.hadoop.net.DNS;
>  import org.apache.hadoop.util.DiskChecker;
> -import org.apache.hadoop.util.ReflectionUtils;
> +import org.apache.hadoop.util.RunJar;
>  import org.apache.hadoop.util.StringUtils;
>  import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>  import org.apache.hama.Constants;
> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>  public class GroomServer implements Runnable {
>   public static final Log LOG = LogFactory.getLog(GroomServer.class);
>   private static BSPPeer bspPeer;
> +  static final String SUBDIR = "groomServer";
>
>   Configuration conf;
>
> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>     }
>
>     try {
> +      localizeJob(tip);
> +    } catch (Throwable e) {
> +      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
> +          .stringifyException(e));
> +      LOG.warn(msg);
> +    }
> +  }
> +
> +  private void localizeJob(TaskInProgress tip) throws IOException {
> +    Task task = tip.getTask();
> +    conf.addResource(task.getJobFile());
> +    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> +
> +    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> +        + task.getTaskID() + "/" + "job.xml");
> +
> +    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
> +    BSPJob jobConf = null;
> +
> +    synchronized (rjob) {
> +      if (!rjob.localized) {
> +        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> +            + task.getTaskID() + "/" + "job.jar");
> +        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> +        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
> +
> +        HamaConfiguration conf = new HamaConfiguration();
> +        conf.addResource(localJobFile);
> +        jobConf = new BSPJob(conf, task.getJobID().toString());
> +        jobConf.setJar(localJarFile.toString());
> +
> +        if (jarFile != null) {
> +          systemFS.copyToLocalFile(jarFile, localJarFile);
> +
> +          // also unjar the job.jar files in workdir
> +          File workDir = new File(
> +              new File(localJobFile.toString()).getParent(), "work");
> +          if (!workDir.mkdirs()) {
> +            if (!workDir.isDirectory()) {
> +              throw new IOException("Mkdirs failed to create "
> +                  + workDir.toString());
> +            }
> +          }
> +          RunJar.unJar(new File(localJarFile.toString()), workDir);
> +        }
> +        rjob.localized = true;
> +      }
> +    }
> +    launchTaskForJob(tip, jobConf);
> +  }
> +
> +  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
> +    try {
> +      tip.setJobConf(jobConf);
>       tip.launchTask();
>     } catch (Throwable ie) {
> -      // TODO: when job failed.
> +      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
> +      String error = StringUtils.stringifyException(ie);
> +      LOG.info(error);
> +    }
> +  }
> +
> +  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
> +      TaskInProgress tip) {
> +    synchronized (runningJobs) {
> +      RunningJob rJob = null;
> +      if (!runningJobs.containsKey(jobId)) {
> +        rJob = new RunningJob(jobId, localJobFile);
> +        rJob.localized = false;
> +        rJob.tasks = new HashSet<TaskInProgress>();
> +        rJob.jobFile = localJobFile;
> +        runningJobs.put(jobId, rJob);
> +      } else {
> +        rJob = runningJobs.get(jobId);
> +      }
> +      rJob.tasks.add(tip);
> +      return rJob;
> +    }
> +  }
> +
> +  /**
> +   * The datastructure for initializing a job
> +   */
> +  static class RunningJob {
> +    private BSPJobID jobid;
> +    private Path jobFile;
> +    // keep this for later use
> +    Set<TaskInProgress> tasks;
> +    boolean localized;
> +    boolean keepJobFiles;
> +
> +    RunningJob(BSPJobID jobid, Path jobFile) {
> +      this.jobid = jobid;
> +      localized = false;
> +      tasks = new HashSet<TaskInProgress>();
> +      this.jobFile = jobFile;
> +      keepJobFiles = false;
> +    }
> +
> +    Path getJobFile() {
> +      return jobFile;
> +    }
> +
> +    BSPJobID getJobId() {
> +      return jobid;
>     }
>   }
>
> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>   // /////////////////////////////////////////////////////
>   class TaskInProgress {
>     Task task;
> +    BSPJob jobConf;
> +    private BSPTaskRunner runner;
>     volatile boolean done = false;
>     volatile boolean wasKilled = false;
>     private TaskStatus taskStatus;
> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>           TaskStatus.Phase.STARTING);
>     }
>
> -    static final String SUBDIR = "groomServer";
> +    public void setJobConf(BSPJob jobConf) {
> +      this.jobConf = jobConf;
> +    }
>
> -    public void launchTask() {
> +    public void launchTask() throws IOException {
>       taskStatus.setRunState(TaskStatus.State.RUNNING);
> +      this.runner = task.createRunner(bspPeer, this.jobConf);
> +      this.runner.start();
>
> -      try {
> -        // TODO: need to move this code to TaskRunner
> -
> -        task.getJobFile();
> -        conf.addResource(task.getJobFile());
> -        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
> -
> -        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> -            + task.getTaskID() + "/" + "job.xml");
> -        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
> -            + task.getTaskID() + "/" + "job.jar");
> -
> -        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
> -        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
> -            ".jar")), localJarFile);
> -
> -        HamaConfiguration conf = new HamaConfiguration();
> -        conf.addResource(localJobFile);
> -        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
> -        jobConf.setJar(localJarFile.toString());
> -
> -        BSP bsp = (BSP) ReflectionUtils
> -            .newInstance(jobConf.getBspClass(), conf);
> -        bsp.setPeer(bspPeer);
> +      // Check state of Task
> +      while (true) {
>         try {
> -          bsp.runBSP();
> -        } catch (Exception e) {
> +          Thread.sleep(1000);
> +        } catch (InterruptedException e) {
>           e.printStackTrace();
> -          taskStatus.setRunState(TaskStatus.State.FAILED);
>         }
>
> -      } catch (IOException e) {
> -        // TODO Auto-generated catch block
> -        e.printStackTrace();
> -      } finally {
> -
> -        while (true) {
> -          try {
> -            Thread.sleep(1000);
> -          } catch (InterruptedException e) {
> -            e.printStackTrace();
> -          }
> -
> -          // If local/outgoing queues are empty, task is done.
> -          if (bspPeer.localQueue.size() == 0
> -              && bspPeer.outgoingQueues.size() == 0) {
> -            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> -            acceptNewTasks = true;
> -            break;
> -          }
> +        // If local/outgoing queues are empty, task is done.
> +        if (bspPeer.localQueue.size() == 0
> +            && bspPeer.outgoingQueues.size() == 0) {
> +          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
> +          acceptNewTasks = true;
> +          break;
>         }
>       }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed Oct 20
01:48:36 2010
> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>   }
>
>   public void readFields(DataInput in) throws IOException {
> -    task = new Task();
> +    task = new BSPTask();
>     task.readFields(in);
>   }
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed Oct 20
01:48:36 2010
> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>
>           try {
>             GroomServer servers = new GroomServer(conf);
> -            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i,
this.conf);
> +            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(), i);
>
>             // TODO not yet implemented
>
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36 2010
> @@ -23,14 +23,12 @@ import java.io.IOException;
>
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
> +import org.apache.hadoop.conf.Configuration;
>  import org.apache.hadoop.fs.LocalDirAllocator;
>  import org.apache.hadoop.io.Text;
>  import org.apache.hadoop.io.Writable;
>
> -/**
> - *
> - */
> -public class Task implements Writable {
> +public abstract class Task implements Writable {
>   public static final Log LOG = LogFactory.getLog(Task.class);
>   ////////////////////////////////////////////
>   // Fields
> @@ -109,5 +107,7 @@ public class Task implements Writable {
>     taskId = Text.readString(in);
>     partition = in.readInt();
>   }
> +
> +  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>
>  }
>
> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
> ==============================================================================
> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed Oct 20
01:48:36 2010
> @@ -101,7 +101,7 @@ class TaskInProgress {
>         return null;
>       }
>
> -      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
> +      t = new BSPTask(jobId, jobFile, taskid, partition);
>       activeTasks.put(taskid, status.getGroomName());
>
>       // Ask JobTracker to note that the task exists
>
>
>



-- 
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org

"Reasonable men adapt themselves to the world.
 Unreasonable men adapt the world to themselves.
 That's why all progress depends on unreasonable men."

Mime
View raw message