hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Filipe David Manana <fdman...@apache.org>
Subject Re: svn commit: r1024485 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/java/org/apache/hama/bsp/
Date Wed, 20 Oct 2010 10:07:52 GMT
On Wed, Oct 20, 2010 at 10:41 AM, Edward J. Yoon <edwardyoon@apache.org> wrote:
> Thanks.
>
> BTW, getAddress() and getHostName() methods are somewhat duplicated.
> If we remove one of the two, we have to change all code, related with
> it.
>
> https://issues.apache.org/jira/browse/HAMA-316
>
> Could you please comment here?

Done :)

>
> On Wed, Oct 20, 2010 at 6:31 PM, Filipe David Manana
> <fdmanana@apache.org> wrote:
>> A big +1 on this one :)
>>
>> On Wed, Oct 20, 2010 at 2:48 AM,  <edwardyoon@apache.org> wrote:
>>> Author: edwardyoon
>>> Date: Wed Oct 20 01:48:36 2010
>>> New Revision: 1024485
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1024485&view=rev
>>> Log:
>>> Refactoring launchTask() method in GroomServer
>>>
>>> Added:
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> Modified:
>>>    incubator/hama/trunk/CHANGES.txt
>>>    incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>>    incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>>
>>> Modified: incubator/hama/trunk/CHANGES.txt
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/CHANGES.txt (original)
>>> +++ incubator/hama/trunk/CHANGES.txt Wed Oct 20 01:48:36 2010
>>> @@ -50,6 +50,7 @@ Trunk (unreleased changes)
>>>
>>>   IMPROVEMENTS
>>>
>>> +    HAMA-300: Refactoring launchTask() method in GroomServer (edwardyoon)
>>>     HAMA-311: Add unit tests for IPC package (edwardyoon)
>>>     HAMA-312: Add serialize printing to ExampleDriver (edwardyoon)
>>>     HAMA-309: Add unit tests for Bytes utilities (edwardyoon)
>>>
>>> Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
(original)
>>> +++ incubator/hama/trunk/src/examples/org/apache/hama/examples/PiEstimator.java
Wed Oct 20 01:48:36 2010
>>> @@ -54,7 +54,7 @@ public class PiEstimator {
>>>         }
>>>       }
>>>
>>> -      byte[] tagName = Bytes.toBytes(getName().toString());
>>> +      byte[] tagName = Bytes.toBytes(bspPeer.getHostName());
>>>       byte[] myData = Bytes.toBytes(4.0 * (double) in / (double) iterations);
>>>       BSPMessage estimate = new BSPMessage(tagName, myData);
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSP.java Wed Oct 20 01:48:36
2010
>>> @@ -20,20 +20,5 @@ package org.apache.hama.bsp;
>>>  /**
>>>  * This class provides an abstract implementation of the BSP interface
>>>  */
>>> -public abstract class BSP extends Thread implements BSPInterface {
>>> -  private BSPPeer bspPeer;
>>> -
>>> -  /**
>>> -   * A thread's run method.
>>> -   *
>>> -   * The run method performs the
>>> -   * {@link org.apache.hama.bsp.BSPInterface#bsp(BSPPeer)}
>>> -   */
>>> -  public void runBSP() throws Exception {
>>> -    bsp(bspPeer);
>>> -  }
>>> -
>>> -  public void setPeer(BSPPeer bspServer) {
>>> -    this.bspPeer = bspServer;
>>> -  }
>>> +public abstract class BSP implements BSPInterface {
>>>  }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessage.java Wed Oct
20 01:48:36 2010
>>> @@ -30,6 +30,12 @@ public class BSPMessage implements Writa
>>>   public BSPMessage() {
>>>   }
>>>
>>> +  /**
>>> +   * Constructor
>>> +   *
>>> +   * @param tag of data
>>> +   * @param data of message
>>> +   */
>>>   public BSPMessage(byte[] tag, byte[] data) {
>>>     this.tag = new byte[tag.length];
>>>     this.data = new byte[data.length];
>>> @@ -37,11 +43,20 @@ public class BSPMessage implements Writa
>>>     System.arraycopy(data, 0, this.data, 0, data.length);
>>>   }
>>>
>>> +  /**
>>> +   * BSP messages are typically identified with tags. This allows to get the
tag
>>> +   * of data.
>>> +   *
>>> +   * @return tag of data of BSP message
>>> +   */
>>>   public byte[] getTag() {
>>>     byte[] result = this.tag;
>>>     return result;
>>>   }
>>>
>>> +  /**
>>> +   * @return data of BSP message
>>> +   */
>>>   public byte[] getData() {
>>>     byte[] result = this.data;
>>>     return result;
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTask.java Wed Oct 20
01:48:36 2010
>>> @@ -17,26 +17,21 @@
>>>  */
>>>  package org.apache.hama.bsp;
>>>
>>> -import org.apache.hadoop.conf.Configuration;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> -
>>>  public class BSPTask extends Task {
>>> -  private BSP bsp;
>>> -  private Configuration conf;
>>> -
>>> -  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition,
Configuration conf) {
>>> +
>>> +  public BSPTask() {
>>> +  }
>>> +
>>> +  public BSPTask(BSPJobID jobId, String jobFile, String taskid, int partition)
{
>>>     this.jobId = jobId;
>>>     this.jobFile = jobFile;
>>>     this.taskId = taskid;
>>>     this.partition = partition;
>>> -    this.conf = conf;
>>>   }
>>>
>>> -  public BSP getBSPClass() {
>>> -    bsp = (BSP) ReflectionUtils.newInstance(conf.getClass("bsp.work.class",
>>> -        BSP.class), conf);
>>> -
>>> -    return bsp;
>>> +  @Override
>>> +  public BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob conf) {
>>> +    return new BSPTaskRunner(this, bspPeer, conf);
>>>   }
>>>
>>>  }
>>>
>>> Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java?rev=1024485&view=auto
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java (added)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPTaskRunner.java Wed
Oct 20 01:48:36 2010
>>> @@ -0,0 +1,62 @@
>>> +/**
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +package org.apache.hama.bsp;
>>> +
>>> +import java.io.IOException;
>>> +
>>> +import org.apache.commons.logging.Log;
>>> +import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.zookeeper.KeeperException;
>>> +
>>> +public class BSPTaskRunner extends Thread {
>>> +
>>> +  public static final Log LOG = LogFactory.getLog(BSPTaskRunner.class);
>>> +  private Task task;
>>> +  private BSPJob conf;
>>> +  private BSPPeer bspPeer;
>>> +
>>> +  public BSPTaskRunner(BSPTask bspTask, BSPPeer bspPeer, BSPJob conf) {
>>> +    this.task = bspTask;
>>> +    this.conf = conf;
>>> +    this.bspPeer = bspPeer;
>>> +  }
>>> +
>>> +  public Task getTask() {
>>> +    return task;
>>> +  }
>>> +
>>> +  public void run() {
>>> +    BSP bsp = (BSP) ReflectionUtils.newInstance(conf.getConf().getClass(
>>> +        "bsp.work.class", BSP.class), conf.getConf());
>>> +
>>> +    try {
>>> +      bsp.bsp(bspPeer);
>>> +    } catch (IOException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    } catch (KeeperException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    } catch (InterruptedException e) {
>>> +      // TODO Auto-generated catch block
>>> +      e.printStackTrace();
>>> +    }
>>> +  }
>>> +
>>> +}
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Wed Oct
20 01:48:36 2010
>>> @@ -23,9 +23,11 @@ import java.lang.reflect.Constructor;
>>>  import java.net.InetSocketAddress;
>>>  import java.util.ArrayList;
>>>  import java.util.HashMap;
>>> +import java.util.HashSet;
>>>  import java.util.LinkedHashMap;
>>>  import java.util.List;
>>>  import java.util.Map;
>>> +import java.util.Set;
>>>  import java.util.TreeMap;
>>>  import java.util.concurrent.BlockingQueue;
>>>  import java.util.concurrent.LinkedBlockingQueue;
>>> @@ -40,7 +42,7 @@ import org.apache.hadoop.ipc.RPC;
>>>  import org.apache.hadoop.ipc.RemoteException;
>>>  import org.apache.hadoop.net.DNS;
>>>  import org.apache.hadoop.util.DiskChecker;
>>> -import org.apache.hadoop.util.ReflectionUtils;
>>> +import org.apache.hadoop.util.RunJar;
>>>  import org.apache.hadoop.util.StringUtils;
>>>  import org.apache.hadoop.util.DiskChecker.DiskErrorException;
>>>  import org.apache.hama.Constants;
>>> @@ -50,6 +52,7 @@ import org.apache.hama.ipc.InterTrackerP
>>>  public class GroomServer implements Runnable {
>>>   public static final Log LOG = LogFactory.getLog(GroomServer.class);
>>>   private static BSPPeer bspPeer;
>>> +  static final String SUBDIR = "groomServer";
>>>
>>>   Configuration conf;
>>>
>>> @@ -281,9 +284,111 @@ public class GroomServer implements Runn
>>>     }
>>>
>>>     try {
>>> +      localizeJob(tip);
>>> +    } catch (Throwable e) {
>>> +      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n"
+ StringUtils
>>> +          .stringifyException(e));
>>> +      LOG.warn(msg);
>>> +    }
>>> +  }
>>> +
>>> +  private void localizeJob(TaskInProgress tip) throws IOException {
>>> +    Task task = tip.getTask();
>>> +    conf.addResource(task.getJobFile());
>>> +    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> +
>>> +    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> +        + task.getTaskID() + "/" + "job.xml");
>>> +
>>> +    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
>>> +    BSPJob jobConf = null;
>>> +
>>> +    synchronized (rjob) {
>>> +      if (!rjob.localized) {
>>> +        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> +            + task.getTaskID() + "/" + "job.jar");
>>> +        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> +        Path jarFile = new Path(task.getJobFile().replace(".xml", ".jar"));
>>> +
>>> +        HamaConfiguration conf = new HamaConfiguration();
>>> +        conf.addResource(localJobFile);
>>> +        jobConf = new BSPJob(conf, task.getJobID().toString());
>>> +        jobConf.setJar(localJarFile.toString());
>>> +
>>> +        if (jarFile != null) {
>>> +          systemFS.copyToLocalFile(jarFile, localJarFile);
>>> +
>>> +          // also unjar the job.jar files in workdir
>>> +          File workDir = new File(
>>> +              new File(localJobFile.toString()).getParent(), "work");
>>> +          if (!workDir.mkdirs()) {
>>> +            if (!workDir.isDirectory()) {
>>> +              throw new IOException("Mkdirs failed to create "
>>> +                  + workDir.toString());
>>> +            }
>>> +          }
>>> +          RunJar.unJar(new File(localJarFile.toString()), workDir);
>>> +        }
>>> +        rjob.localized = true;
>>> +      }
>>> +    }
>>> +    launchTaskForJob(tip, jobConf);
>>> +  }
>>> +
>>> +  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
>>> +    try {
>>> +      tip.setJobConf(jobConf);
>>>       tip.launchTask();
>>>     } catch (Throwable ie) {
>>> -      // TODO: when job failed.
>>> +      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
>>> +      String error = StringUtils.stringifyException(ie);
>>> +      LOG.info(error);
>>> +    }
>>> +  }
>>> +
>>> +  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
>>> +      TaskInProgress tip) {
>>> +    synchronized (runningJobs) {
>>> +      RunningJob rJob = null;
>>> +      if (!runningJobs.containsKey(jobId)) {
>>> +        rJob = new RunningJob(jobId, localJobFile);
>>> +        rJob.localized = false;
>>> +        rJob.tasks = new HashSet<TaskInProgress>();
>>> +        rJob.jobFile = localJobFile;
>>> +        runningJobs.put(jobId, rJob);
>>> +      } else {
>>> +        rJob = runningJobs.get(jobId);
>>> +      }
>>> +      rJob.tasks.add(tip);
>>> +      return rJob;
>>> +    }
>>> +  }
>>> +
>>> +  /**
>>> +   * The datastructure for initializing a job
>>> +   */
>>> +  static class RunningJob {
>>> +    private BSPJobID jobid;
>>> +    private Path jobFile;
>>> +    // keep this for later use
>>> +    Set<TaskInProgress> tasks;
>>> +    boolean localized;
>>> +    boolean keepJobFiles;
>>> +
>>> +    RunningJob(BSPJobID jobid, Path jobFile) {
>>> +      this.jobid = jobid;
>>> +      localized = false;
>>> +      tasks = new HashSet<TaskInProgress>();
>>> +      this.jobFile = jobFile;
>>> +      keepJobFiles = false;
>>> +    }
>>> +
>>> +    Path getJobFile() {
>>> +      return jobFile;
>>> +    }
>>> +
>>> +    BSPJobID getJobId() {
>>> +      return jobid;
>>>     }
>>>   }
>>>
>>> @@ -410,6 +515,8 @@ public class GroomServer implements Runn
>>>   // /////////////////////////////////////////////////////
>>>   class TaskInProgress {
>>>     Task task;
>>> +    BSPJob jobConf;
>>> +    private BSPTaskRunner runner;
>>>     volatile boolean done = false;
>>>     volatile boolean wasKilled = false;
>>>     private TaskStatus taskStatus;
>>> @@ -421,61 +528,29 @@ public class GroomServer implements Runn
>>>           TaskStatus.Phase.STARTING);
>>>     }
>>>
>>> -    static final String SUBDIR = "groomServer";
>>> +    public void setJobConf(BSPJob jobConf) {
>>> +      this.jobConf = jobConf;
>>> +    }
>>>
>>> -    public void launchTask() {
>>> +    public void launchTask() throws IOException {
>>>       taskStatus.setRunState(TaskStatus.State.RUNNING);
>>> +      this.runner = task.createRunner(bspPeer, this.jobConf);
>>> +      this.runner.start();
>>>
>>> -      try {
>>> -        // TODO: need to move this code to TaskRunner
>>> -
>>> -        task.getJobFile();
>>> -        conf.addResource(task.getJobFile());
>>> -        BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
>>> -
>>> -        Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> -            + task.getTaskID() + "/" + "job.xml");
>>> -        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
>>> -            + task.getTaskID() + "/" + "job.jar");
>>> -
>>> -        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);
>>> -        systemFS.copyToLocalFile(new Path(task.getJobFile().replace(".xml",
>>> -            ".jar")), localJarFile);
>>> -
>>> -        HamaConfiguration conf = new HamaConfiguration();
>>> -        conf.addResource(localJobFile);
>>> -        BSPJob jobConf = new BSPJob(conf, task.getJobID().toString());
>>> -        jobConf.setJar(localJarFile.toString());
>>> -
>>> -        BSP bsp = (BSP) ReflectionUtils
>>> -            .newInstance(jobConf.getBspClass(), conf);
>>> -        bsp.setPeer(bspPeer);
>>> +      // Check state of Task
>>> +      while (true) {
>>>         try {
>>> -          bsp.runBSP();
>>> -        } catch (Exception e) {
>>> +          Thread.sleep(1000);
>>> +        } catch (InterruptedException e) {
>>>           e.printStackTrace();
>>> -          taskStatus.setRunState(TaskStatus.State.FAILED);
>>>         }
>>>
>>> -      } catch (IOException e) {
>>> -        // TODO Auto-generated catch block
>>> -        e.printStackTrace();
>>> -      } finally {
>>> -
>>> -        while (true) {
>>> -          try {
>>> -            Thread.sleep(1000);
>>> -          } catch (InterruptedException e) {
>>> -            e.printStackTrace();
>>> -          }
>>> -
>>> -          // If local/outgoing queues are empty, task is done.
>>> -          if (bspPeer.localQueue.size() == 0
>>> -              && bspPeer.outgoingQueues.size() == 0) {
>>> -            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> -            acceptNewTasks = true;
>>> -            break;
>>> -          }
>>> +        // If local/outgoing queues are empty, task is done.
>>> +        if (bspPeer.localQueue.size() == 0
>>> +            && bspPeer.outgoingQueues.size() == 0) {
>>> +          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
>>> +          acceptNewTasks = true;
>>> +          break;
>>>         }
>>>       }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LaunchTaskAction.java Wed
Oct 20 01:48:36 2010
>>> @@ -47,7 +47,7 @@ class LaunchTaskAction extends GroomServ
>>>   }
>>>
>>>   public void readFields(DataInput in) throws IOException {
>>> -    task = new Task();
>>> +    task = new BSPTask();
>>>     task.readFields(in);
>>>   }
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/LocalJobRunner.java Wed
Oct 20 01:48:36 2010
>>> @@ -172,7 +172,7 @@ public class LocalJobRunner implements J
>>>
>>>           try {
>>>             GroomServer servers = new GroomServer(conf);
>>> -            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(),
i, this.conf);
>>> +            Task task = new BSPTask(job.getJobID(), jobFile, tID.toString(),
i);
>>>
>>>             // TODO not yet implemented
>>>
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/Task.java Wed Oct 20 01:48:36
2010
>>> @@ -23,14 +23,12 @@ import java.io.IOException;
>>>
>>>  import org.apache.commons.logging.Log;
>>>  import org.apache.commons.logging.LogFactory;
>>> +import org.apache.hadoop.conf.Configuration;
>>>  import org.apache.hadoop.fs.LocalDirAllocator;
>>>  import org.apache.hadoop.io.Text;
>>>  import org.apache.hadoop.io.Writable;
>>>
>>> -/**
>>> - *
>>> - */
>>> -public class Task implements Writable {
>>> +public abstract class Task implements Writable {
>>>   public static final Log LOG = LogFactory.getLog(Task.class);
>>>   ////////////////////////////////////////////
>>>   // Fields
>>> @@ -109,5 +107,7 @@ public class Task implements Writable {
>>>     taskId = Text.readString(in);
>>>     partition = in.readInt();
>>>   }
>>> +
>>> +  public abstract BSPTaskRunner createRunner(BSPPeer bspPeer, BSPJob jobConf);
>>>
>>>  }
>>>
>>> Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java
>>> URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java?rev=1024485&r1=1024484&r2=1024485&view=diff
>>> ==============================================================================
>>> --- incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java (original)
>>> +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/TaskInProgress.java Wed
Oct 20 01:48:36 2010
>>> @@ -101,7 +101,7 @@ class TaskInProgress {
>>>         return null;
>>>       }
>>>
>>> -      t = new BSPTask(jobId, jobFile, taskid, partition, this.conf);
>>> +      t = new BSPTask(jobId, jobFile, taskid, partition);
>>>       activeTasks.put(taskid, status.getGroomName());
>>>
>>>       // Ask JobTracker to note that the task exists
>>>
>>>
>>>
>>
>>
>>
>> --
>> Filipe David Manana,
>> fdmanana@gmail.com, fdmanana@apache.org
>>
>> "Reasonable men adapt themselves to the world.
>>  Unreasonable men adapt the world to themselves.
>>  That's why all progress depends on unreasonable men."
>>
>
>
>
> --
> Best Regards, Edward J. Yoon
> edwardyoon@apache.org
> http://blog.udanax.org
>



-- 
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org

"Reasonable men adapt themselves to the world.
 Unreasonable men adapt the world to themselves.
 That's why all progress depends on unreasonable men."

Mime
View raw message