hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r529410 [17/27] - in /lucene/hadoop/trunk: ./ src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/ src/contrib/abacus/src/java/org/apache/hadoop/abacus/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/...
Date Mon, 16 Apr 2007 21:44:46 GMT
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Apr 16 14:44:35 2007
@@ -47,584 +47,584 @@
 //
 ////////////////////////////////////////////////////////
 class TaskInProgress {
-    static final int MAX_TASK_EXECS = 1;
-    static final int MAX_TASK_FAILURES = 4;    
-    static final double SPECULATIVE_GAP = 0.2;
-    static final long SPECULATIVE_LAG = 60 * 1000;
-    private static NumberFormat idFormat = NumberFormat.getInstance();
-    static {
-      idFormat.setMinimumIntegerDigits(6);
-      idFormat.setGroupingUsed(false);
-    }
-
-    public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskInProgress");
-
-    // Defines the TIP
-    private String jobFile = null;
-    private String splitClass = null;
-    private BytesWritable split = null;
-    private int numMaps;
-    private int partition;
-    private JobTracker jobtracker;
-    private String id;
-    private JobInProgress job;
-
-    // Status of the TIP
-    private int successEventNumber = -1;
-    private int numTaskFailures = 0;
-    private double progress = 0;
-    private String state = "";
-    private long startTime = 0;
-    private long execStartTime = 0 ;
-    private long execFinishTime = 0 ;
-    private int completes = 0;
-    private boolean failed = false;
-    private boolean killed = false;
-
-    // The 'unique' prefix for taskids of this tip
-    String taskIdPrefix;
-    
-    // The 'next' usable taskid of this tip
-    int nextTaskId = 0;
-    
-    // Map from task Id -> TaskTracker Id, contains tasks that are
-    // currently runnings
-    private TreeMap<String, String> activeTasks = new TreeMap();
-    private JobConf conf;
-    private boolean runSpeculative;
-    private Map<String,List<String>> taskDiagnosticData = new TreeMap();
-    /**
-     * Map from taskId -> TaskStatus
-     */
-    private TreeMap<String,TaskStatus> taskStatuses = 
-      new TreeMap<String,TaskStatus>();
-
-    private TreeSet machinesWhereFailed = new TreeSet();
-    private TreeSet tasksReportedClosed = new TreeSet();
-    
-    private Counters counters = new Counters();
-
-    /**
-     * Constructor for MapTask
-     */
-    public TaskInProgress(String uniqueString, String jobFile, 
-                          String splitClass, BytesWritable split, 
-                          JobTracker jobtracker, JobConf conf, 
-                          JobInProgress job, int partition) {
-        this.jobFile = jobFile;
-        this.splitClass = splitClass;
-        this.split = split;
-        this.jobtracker = jobtracker;
-        this.job = job;
-        this.conf = conf;
-        this.partition = partition;
-        init(uniqueString);
-    }
+  static final int MAX_TASK_EXECS = 1;
+  static final int MAX_TASK_FAILURES = 4;    
+  static final double SPECULATIVE_GAP = 0.2;
+  static final long SPECULATIVE_LAG = 60 * 1000;
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setMinimumIntegerDigits(6);
+    idFormat.setGroupingUsed(false);
+  }
+
+  public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskInProgress");
+
+  // Defines the TIP
+  private String jobFile = null;
+  private String splitClass = null;
+  private BytesWritable split = null;
+  private int numMaps;
+  private int partition;
+  private JobTracker jobtracker;
+  private String id;
+  private JobInProgress job;
+
+  // Status of the TIP
+  private int successEventNumber = -1;
+  private int numTaskFailures = 0;
+  private double progress = 0;
+  private String state = "";
+  private long startTime = 0;
+  private long execStartTime = 0 ;
+  private long execFinishTime = 0 ;
+  private int completes = 0;
+  private boolean failed = false;
+  private boolean killed = false;
+
+  // The 'unique' prefix for taskids of this tip
+  String taskIdPrefix;
+    
+  // The 'next' usable taskid of this tip
+  int nextTaskId = 0;
+    
+  // Map from task Id -> TaskTracker Id, contains tasks that are
+  // currently runnings
+  private TreeMap<String, String> activeTasks = new TreeMap();
+  private JobConf conf;
+  private boolean runSpeculative;
+  private Map<String,List<String>> taskDiagnosticData = new TreeMap();
+  /**
+   * Map from taskId -> TaskStatus
+   */
+  private TreeMap<String,TaskStatus> taskStatuses = 
+    new TreeMap<String,TaskStatus>();
+
+  private TreeSet machinesWhereFailed = new TreeSet();
+  private TreeSet tasksReportedClosed = new TreeSet();
+    
+  private Counters counters = new Counters();
+
+  /**
+   * Constructor for MapTask
+   */
+  public TaskInProgress(String uniqueString, String jobFile, 
+                        String splitClass, BytesWritable split, 
+                        JobTracker jobtracker, JobConf conf, 
+                        JobInProgress job, int partition) {
+    this.jobFile = jobFile;
+    this.splitClass = splitClass;
+    this.split = split;
+    this.jobtracker = jobtracker;
+    this.job = job;
+    this.conf = conf;
+    this.partition = partition;
+    init(uniqueString);
+  }
         
-    /**
-     * Constructor for ReduceTask
-     */
-    public TaskInProgress(String uniqueString, String jobFile, 
-                          int numMaps, 
-                          int partition, JobTracker jobtracker, JobConf conf,
-                          JobInProgress job) {
-        this.jobFile = jobFile;
-        this.numMaps = numMaps;
-        this.partition = partition;
-        this.jobtracker = jobtracker;
-        this.job = job;
-        this.conf = conf;
-        init(uniqueString);
-    }
-
-    /**
-     * Make a unique name for this TIP.
-     * @param uniqueBase The unique name of the job
-     * @return The unique string for this tip
-     */
-    private String makeUniqueString(String uniqueBase) {
-      StringBuffer result = new StringBuffer();
-      result.append(uniqueBase);
-      if (isMapTask()) {
-        result.append("_m_");
-      } else {
-        result.append("_r_");
-      }
-      result.append(idFormat.format(partition));
-      return result.toString();
+  /**
+   * Constructor for ReduceTask
+   */
+  public TaskInProgress(String uniqueString, String jobFile, 
+                        int numMaps, 
+                        int partition, JobTracker jobtracker, JobConf conf,
+                        JobInProgress job) {
+    this.jobFile = jobFile;
+    this.numMaps = numMaps;
+    this.partition = partition;
+    this.jobtracker = jobtracker;
+    this.job = job;
+    this.conf = conf;
+    init(uniqueString);
+  }
+
+  /**
+   * Make a unique name for this TIP.
+   * @param uniqueBase The unique name of the job
+   * @return The unique string for this tip
+   */
+  private String makeUniqueString(String uniqueBase) {
+    StringBuffer result = new StringBuffer();
+    result.append(uniqueBase);
+    if (isMapTask()) {
+      result.append("_m_");
+    } else {
+      result.append("_r_");
+    }
+    result.append(idFormat.format(partition));
+    return result.toString();
+  }
+    
+  /**
+   * Return the index of the tip within the job, so "tip_0002_m_012345"
+   * would return 12345;
+   * @return int the tip index
+   */
+  public int idWithinJob() {
+    return partition;
+  }    
+
+  /**
+   * Initialization common to Map and Reduce
+   */
+  void init(String jobUniqueString) {
+    this.startTime = System.currentTimeMillis();
+    this.runSpeculative = conf.getSpeculativeExecution();
+    this.taskIdPrefix = makeUniqueString(jobUniqueString);
+    this.id = "tip_" + this.taskIdPrefix;
+  }
+
+  ////////////////////////////////////
+  // Accessors, info, profiles, etc.
+  ////////////////////////////////////
+
+  /**
+   * Return the parent job
+   */
+  public JobInProgress getJob() {
+    return job;
+  }
+  /**
+   * Return an ID for this task, not its component taskid-threads
+   */
+  public String getTIPId() {
+    return this.id;
+  }
+  /**
+   * Whether this is a map task
+   */
+  public boolean isMapTask() {
+    return split != null;
+  }
+    
+  /**
+   * Is this tip currently running any tasks?
+   * @return true if any tasks are running
+   */
+  public boolean isRunning() {
+    return !activeTasks.isEmpty();
+  }
+    
+  /**
+   * Is this tip complete?
+   * 
+   * @return <code>true</code> if the tip is complete, else <code>false</code>
+   */
+  public boolean isComplete() {
+    return (completes > 0);
+  }
+
+  /**
+   * Is the given taskid in this tip complete?
+   * 
+   * @param taskid taskid of attempt to check for completion
+   * @return <code>true</code> if taskid is complete, else <code>false</code>
+   */
+  public boolean isComplete(String taskid) {
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    if (status == null) {
+      return false;
+    }
+    return ((completes > 0) && 
+            (status.getRunState() == TaskStatus.State.SUCCEEDED));
+  }
+
+  /**
+   * Is the tip a failure?
+   * 
+   * @return <code>true</code> if tip has failed, else <code>false</code>
+   */
+  public boolean isFailed() {
+    return failed;
+  }
+
+  /**
+   * Number of times the TaskInProgress has failed.
+   */
+  public int numTaskFailures() {
+    return numTaskFailures;
+  }
+
+  /**
+   * Get the overall progress (from 0 to 1.0) for this TIP
+   */
+  public double getProgress() {
+    return progress;
+  }
+    
+  /**
+   * Get the task's counters
+   */
+  public Counters getCounters() {
+    return counters;
+  }
+  /**
+   * Returns whether a component task-thread should be 
+   * closed because the containing JobInProgress has completed.
+   */
+  public boolean shouldCloseForClosedJob(String taskid) {
+    // If the thing has never been closed,
+    // and it belongs to this TIP,
+    // and this TIP is somehow FINISHED,
+    // then true
+    TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+    if ((ts != null) &&
+        (! tasksReportedClosed.contains(taskid)) &&
+        (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+      tasksReportedClosed.add(taskid);
+      return true;
+    } else if( !isMapTask() && isComplete() && 
+               ! tasksReportedClosed.contains(taskid) ){
+      tasksReportedClosed.add(taskid);
+      return true; 
+    }
+    else {
+      return false;
+    }
+  }
+
+  /**
+   * Creates a "status report" for this task.  Includes the
+   * task ID and overall status, plus reports for all the
+   * component task-threads that have ever been started.
+   */
+  synchronized TaskReport generateSingleReport() {
+    ArrayList diagnostics = new ArrayList();
+    for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {
+      diagnostics.addAll((List)i.next());
     }
-    
-    /**
-     * Return the index of the tip within the job, so "tip_0002_m_012345"
-     * would return 12345;
-     * @return int the tip index
-     */
-     public int idWithinJob() {
-       return partition;
-     }    
-
-    /**
-     * Initialization common to Map and Reduce
-     */
-    void init(String jobUniqueString) {
-        this.startTime = System.currentTimeMillis();
-        this.runSpeculative = conf.getSpeculativeExecution();
-        this.taskIdPrefix = makeUniqueString(jobUniqueString);
-        this.id = "tip_" + this.taskIdPrefix;
-    }
-
-    ////////////////////////////////////
-    // Accessors, info, profiles, etc.
-    ////////////////////////////////////
-
-    /**
-     * Return the parent job
-     */
-    public JobInProgress getJob() {
-        return job;
-    }
-    /**
-     * Return an ID for this task, not its component taskid-threads
-     */
-    public String getTIPId() {
-        return this.id;
-    }
-    /**
-     * Whether this is a map task
-     */
-    public boolean isMapTask() {
-        return split != null;
-    }
-    
-    /**
-     * Is this tip currently running any tasks?
-     * @return true if any tasks are running
-     */
-    public boolean isRunning() {
-      return !activeTasks.isEmpty();
-    }
-    
-    /**
-     * Is this tip complete?
-     * 
-     * @return <code>true</code> if the tip is complete, else <code>false</code>
-     */
-    public boolean isComplete() {
-        return (completes > 0);
-    }
-
-    /**
-     * Is the given taskid in this tip complete?
-     * 
-     * @param taskid taskid of attempt to check for completion
-     * @return <code>true</code> if taskid is complete, else <code>false</code>
-     */
-    public boolean isComplete(String taskid) {
-        TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
-        if (status == null) {
-            return false;
-        }
-        return ((completes > 0) && 
-                (status.getRunState() == TaskStatus.State.SUCCEEDED));
-    }
-
-    /**
-     * Is the tip a failure?
-     * 
-     * @return <code>true</code> if tip has failed, else <code>false</code>
-     */
-    public boolean isFailed() {
-        return failed;
-    }
-
-    /**
-     * Number of times the TaskInProgress has failed.
-     */
-    public int numTaskFailures() {
-        return numTaskFailures;
-    }
-
-    /**
-     * Get the overall progress (from 0 to 1.0) for this TIP
-     */
-    public double getProgress() {
-        return progress;
-    }
-    
-    /**
-     * Get the task's counters
-     */
-    public Counters getCounters() {
-      return counters;
-    }
-    /**
-     * Returns whether a component task-thread should be 
-     * closed because the containing JobInProgress has completed.
-     */
-    public boolean shouldCloseForClosedJob(String taskid) {
-        // If the thing has never been closed,
-        // and it belongs to this TIP,
-        // and this TIP is somehow FINISHED,
-        // then true
-        TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
-        if ((ts != null) &&
-            (! tasksReportedClosed.contains(taskid)) &&
-            (job.getStatus().getRunState() != JobStatus.RUNNING)) {
-            tasksReportedClosed.add(taskid);
-            return true;
-        } else if( !isMapTask() && isComplete() && 
-                ! tasksReportedClosed.contains(taskid) ){
-            tasksReportedClosed.add(taskid);
-            return true; 
-        }
-        else {
-            return false;
-        }
-    }
-
-    /**
-     * Creates a "status report" for this task.  Includes the
-     * task ID and overall status, plus reports for all the
-     * component task-threads that have ever been started.
-     */
-    synchronized TaskReport generateSingleReport() {
-      ArrayList diagnostics = new ArrayList();
-      for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {
-        diagnostics.addAll((List)i.next());
-      }
-      TaskReport report = new TaskReport
+    TaskReport report = new TaskReport
       (getTIPId(), (float)progress, state,
-          (String[])diagnostics.toArray(new String[diagnostics.size()]),
-          execStartTime, execFinishTime, counters);
+       (String[])diagnostics.toArray(new String[diagnostics.size()]),
+       execStartTime, execFinishTime, counters);
       
-      return report ;
-    }
+    return report ;
+  }
 
-    /**
-     * Get the diagnostic messages for a given task within this tip.
-     * @param taskId the id of the required task
-     * @return the list of diagnostics for that task
-     */
-    synchronized List<String> getDiagnosticInfo(String taskId) {
-      return taskDiagnosticData.get(taskId);
-    }
-    
-    ////////////////////////////////////////////////
-    // Update methods, usually invoked by the owning
-    // job.
-    ////////////////////////////////////////////////
-    /**
-     * A status message from a client has arrived.
-     * It updates the status of a single component-thread-task,
-     * which might result in an overall TaskInProgress status update.
-     * @return has the task changed its state noticably?
-     */
-    synchronized boolean updateStatus(TaskStatus status) {
-        String taskid = status.getTaskId();
-        String diagInfo = status.getDiagnosticInfo();
-        TaskStatus oldStatus = (TaskStatus) taskStatuses.get(taskid);
-        boolean changed = true;
-        if (diagInfo != null && diagInfo.length() > 0) {
-          LOG.info("Error from "+taskid+": "+diagInfo);
-          List diagHistory = (List) taskDiagnosticData.get(taskid);
-          if (diagHistory == null) {
-              diagHistory = new ArrayList();
-              taskDiagnosticData.put(taskid, diagHistory);
-          }
-          diagHistory.add(diagInfo);
-        }
-        if (oldStatus != null) {
-          TaskStatus.State oldState = oldStatus.getRunState();
-          TaskStatus.State newState = status.getRunState();
+  /**
+   * Get the diagnostic messages for a given task within this tip.
+   * @param taskId the id of the required task
+   * @return the list of diagnostics for that task
+   */
+  synchronized List<String> getDiagnosticInfo(String taskId) {
+    return taskDiagnosticData.get(taskId);
+  }
+    
+  ////////////////////////////////////////////////
+  // Update methods, usually invoked by the owning
+  // job.
+  ////////////////////////////////////////////////
+  /**
+   * A status message from a client has arrived.
+   * It updates the status of a single component-thread-task,
+   * which might result in an overall TaskInProgress status update.
+   * @return has the task changed its state noticably?
+   */
+  synchronized boolean updateStatus(TaskStatus status) {
+    String taskid = status.getTaskId();
+    String diagInfo = status.getDiagnosticInfo();
+    TaskStatus oldStatus = (TaskStatus) taskStatuses.get(taskid);
+    boolean changed = true;
+    if (diagInfo != null && diagInfo.length() > 0) {
+      LOG.info("Error from "+taskid+": "+diagInfo);
+      List diagHistory = (List) taskDiagnosticData.get(taskid);
+      if (diagHistory == null) {
+        diagHistory = new ArrayList();
+        taskDiagnosticData.put(taskid, diagHistory);
+      }
+      diagHistory.add(diagInfo);
+    }
+    if (oldStatus != null) {
+      TaskStatus.State oldState = oldStatus.getRunState();
+      TaskStatus.State newState = status.getRunState();
           
-          // We should never recieve a duplicate success/failure/killed
-          // status update for the same taskid! This is a safety check, 
-          // and is addressed better at the TaskTracker to ensure this.
-          // @see {@link TaskTracker.transmitHeartbeat()}
-          if ((newState != TaskStatus.State.RUNNING) && 
-                  (oldState == newState)) {
-              LOG.warn("Recieved duplicate status update of '" + newState + 
-                      "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
-              return false;
-          }
+      // We should never recieve a duplicate success/failure/killed
+      // status update for the same taskid! This is a safety check, 
+      // and is addressed better at the TaskTracker to ensure this.
+      // @see {@link TaskTracker.transmitHeartbeat()}
+      if ((newState != TaskStatus.State.RUNNING) && 
+          (oldState == newState)) {
+        LOG.warn("Recieved duplicate status update of '" + newState + 
+                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+        return false;
+      }
 
-          // The task is not allowed to move from completed back to running.
-          // We have seen out of order status messagesmoving tasks from complete
-          // to running. This is a spot fix, but it should be addressed more
-          // globally.
-          if (newState == TaskStatus.State.RUNNING &&
-              (oldState == TaskStatus.State.FAILED || 
-               oldState == TaskStatus.State.KILLED || 
-               oldState == TaskStatus.State.SUCCEEDED)) {
-            return false;
-          }
+      // The task is not allowed to move from completed back to running.
+      // We have seen out of order status messagesmoving tasks from complete
+      // to running. This is a spot fix, but it should be addressed more
+      // globally.
+      if (newState == TaskStatus.State.RUNNING &&
+          (oldState == TaskStatus.State.FAILED || 
+           oldState == TaskStatus.State.KILLED || 
+           oldState == TaskStatus.State.SUCCEEDED)) {
+        return false;
+      }
           
-          changed = oldState != newState;
-        }
-        
-        taskStatuses.put(taskid, status);
-
-        // Recompute progress
-        recomputeProgress();
-        return changed;
-    }
-
-    /**
-     * Indicate that one of the taskids in this TaskInProgress
-     * has failed.
-     */
-    public void failedSubTask(String taskid, String trackerName) {
-        //
-        // Note the failure and its location
-        //
-        LOG.info("Task '" + taskid + "' has been lost.");
-        TaskStatus status = taskStatuses.get(taskid);
-        if (status != null) {
-            status.setRunState(TaskStatus.State.FAILED);
-            // tasktracker went down and failed time was not reported. 
-            if( 0 == status.getFinishTime() ){
-              status.setFinishTime(System.currentTimeMillis());
-            }
-        }
-        this.activeTasks.remove(taskid);
-        if (this.completes > 0 && this.isMapTask()) {
-            this.completes--;
-        }
-
-        numTaskFailures++;
-        if (numTaskFailures >= MAX_TASK_FAILURES) {
-            LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
-            kill();
-        }
-        machinesWhereFailed.add(trackerName);
+      changed = oldState != newState;
     }
+        
+    taskStatuses.put(taskid, status);
 
-    /**
-     * Indicate that one of the taskids in this TaskInProgress
-     * has successfully completed. 
-     * 
-     * However this may not be the first subtask in this 
-     * TaskInProgress to be completed and hence we might not want to 
-     * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
-     */
-    void completedTask(String taskid) {
-        LOG.info("Task '" + taskid + "' has completed.");
-        TaskStatus status = taskStatuses.get(taskid);
-        status.setRunState(TaskStatus.State.SUCCEEDED);
-        activeTasks.remove(taskid);
+    // Recompute progress
+    recomputeProgress();
+    return changed;
+  }
+
+  /**
+   * Indicate that one of the taskids in this TaskInProgress
+   * has failed.
+   */
+  public void failedSubTask(String taskid, String trackerName) {
+    //
+    // Note the failure and its location
+    //
+    LOG.info("Task '" + taskid + "' has been lost.");
+    TaskStatus status = taskStatuses.get(taskid);
+    if (status != null) {
+      status.setRunState(TaskStatus.State.FAILED);
+      // tasktracker went down and failed time was not reported. 
+      if( 0 == status.getFinishTime() ){
+        status.setFinishTime(System.currentTimeMillis());
+      }
     }
-    
-    /**
-     * Indicate that one of the taskids in this TaskInProgress
-     * has successfully completed!
-     */
-    public void completed(String taskid) {
-        //
-        // Record that this taskid is complete
-        //
-        completedTask(taskid);
+    this.activeTasks.remove(taskid);
+    if (this.completes > 0 && this.isMapTask()) {
+      this.completes--;
+    }
+
+    numTaskFailures++;
+    if (numTaskFailures >= MAX_TASK_FAILURES) {
+      LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
+      kill();
+    }
+    machinesWhereFailed.add(trackerName);
+  }
+
+  /**
+   * Indicate that one of the taskids in this TaskInProgress
+   * has successfully completed. 
+   * 
+   * However this may not be the first subtask in this 
+   * TaskInProgress to be completed and hence we might not want to 
+   * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
+   */
+  void completedTask(String taskid) {
+    LOG.info("Task '" + taskid + "' has completed.");
+    TaskStatus status = taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.SUCCEEDED);
+    activeTasks.remove(taskid);
+  }
+    
+  /**
+   * Indicate that one of the taskids in this TaskInProgress
+   * has successfully completed!
+   */
+  public void completed(String taskid) {
+    //
+    // Record that this taskid is complete
+    //
+    completedTask(taskid);
         
-        //
-        // Now that the TIP is complete, the other speculative 
-        // subtasks will be closed when the owning tasktracker 
-        // reports in and calls shouldClose() on this object.
-        //
-
-        this.completes++;
-        recomputeProgress();
-    }
-
-    /**
-     * Get the Status of the tasks managed by this TIP
-     */
-    public TaskStatus[] getTaskStatuses() {
-	    return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
-    }
-
-    /**
-     * Get the status of the specified task
-     * @param taskid
-     * @return
-     */
-    public TaskStatus getTaskStatus(String taskid) {
-      return taskStatuses.get(taskid);
-    }
-     /**
-     * The TIP's been ordered kill()ed.
-     */
-    public void kill() {
-        if (isComplete() || failed) {
-            return;
-        }
-        this.failed = true;
-        killed = true;
-        recomputeProgress();
-    }
-
-    /**
-     * Was the task killed?
-     * @return true if the task killed
-     */
-    public boolean wasKilled() {
-      return killed;
-    }
-    
-    /**
-     * This method is called whenever there's a status change
-     * for one of the TIP's sub-tasks.  It recomputes the overall 
-     * progress for the TIP.  We examine all sub-tasks and find 
-     * the one that's most advanced (and non-failed).
-     */
-    void recomputeProgress() {
-        if (isComplete()) {
-            this.progress = 1;
-            this.execFinishTime = System.currentTimeMillis();
-        } else if (failed) {
-            this.progress = 0;
-            this.execFinishTime = System.currentTimeMillis();
-        } else {
-            double bestProgress = 0;
-            String bestState = "";
-            Counters bestCounters = new Counters();
-            for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {
-                String taskid = (String) it.next();
-                TaskStatus status = taskStatuses.get(taskid);
-                if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
-                    bestProgress = 1;
-                    bestState = status.getStateString();
-                    bestCounters = status.getCounters();
-                    break;
-                } else if (status.getRunState() == TaskStatus.State.RUNNING) {
-                  if (status.getProgress() >= bestProgress) {
-                    bestProgress = status.getProgress();
-                    bestState = status.getStateString();
-                    bestCounters = status.getCounters();
-                  }
-                }
-            }
-            this.progress = bestProgress;
-            this.state = bestState;
-            this.counters = bestCounters;
+    //
+    // Now that the TIP is complete, the other speculative 
+    // subtasks will be closed when the owning tasktracker 
+    // reports in and calls shouldClose() on this object.
+    //
+
+    this.completes++;
+    recomputeProgress();
+  }
+
+  /**
+   * Get the Status of the tasks managed by this TIP
+   */
+  public TaskStatus[] getTaskStatuses() {
+    return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
+  }
+
+  /**
+   * Get the status of the specified task
+   * @param taskid
+   * @return
+   */
+  public TaskStatus getTaskStatus(String taskid) {
+    return taskStatuses.get(taskid);
+  }
+  /**
+   * The TIP's been ordered kill()ed.
+   */
+  public void kill() {
+    if (isComplete() || failed) {
+      return;
+    }
+    this.failed = true;
+    killed = true;
+    recomputeProgress();
+  }
+
+  /**
+   * Was the task killed?
+   * @return true if the task killed
+   */
+  public boolean wasKilled() {
+    return killed;
+  }
+    
+  /**
+   * This method is called whenever there's a status change
+   * for one of the TIP's sub-tasks.  It recomputes the overall 
+   * progress for the TIP.  We examine all sub-tasks and find 
+   * the one that's most advanced (and non-failed).
+   */
+  void recomputeProgress() {
+    if (isComplete()) {
+      this.progress = 1;
+      this.execFinishTime = System.currentTimeMillis();
+    } else if (failed) {
+      this.progress = 0;
+      this.execFinishTime = System.currentTimeMillis();
+    } else {
+      double bestProgress = 0;
+      String bestState = "";
+      Counters bestCounters = new Counters();
+      for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {
+        String taskid = (String) it.next();
+        TaskStatus status = taskStatuses.get(taskid);
+        if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
+          bestProgress = 1;
+          bestState = status.getStateString();
+          bestCounters = status.getCounters();
+          break;
+        } else if (status.getRunState() == TaskStatus.State.RUNNING) {
+          if (status.getProgress() >= bestProgress) {
+            bestProgress = status.getProgress();
+            bestState = status.getStateString();
+            bestCounters = status.getCounters();
+          }
         }
-    }
-
-    /////////////////////////////////////////////////
-    // "Action" methods that actually require the TIP
-    // to do something.
-    /////////////////////////////////////////////////
-
-    /**
-     * Return whether this TIP still needs to run
-     */
-    boolean isRunnable() {
-      return !failed && (completes == 0);
-    }
-    
-    /**
-     * Return whether the TIP has a speculative task to run.  We
-     * only launch a speculative task if the current TIP is really
-     * far behind, and has been behind for a non-trivial amount of 
-     * time.
-     */
-    boolean hasSpeculativeTask(double averageProgress) {
-        //
-        // REMIND - mjc - these constants should be examined
-        // in more depth eventually...
-        //
-      
-      if( activeTasks.size() <= MAX_TASK_EXECS &&
-            runSpeculative &&
-            (averageProgress - progress >= SPECULATIVE_GAP) &&
-            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
-            && completes == 0) {
-          return true;
       }
-        return false;
+      this.progress = bestProgress;
+      this.state = bestState;
+      this.counters = bestCounters;
+    }
+  }
+
+  /////////////////////////////////////////////////
+  // "Action" methods that actually require the TIP
+  // to do something.
+  /////////////////////////////////////////////////
+
+  /**
+   * Return whether this TIP still needs to run
+   */
+  boolean isRunnable() {
+    return !failed && (completes == 0);
+  }
+    
+  /**
+   * Return whether the TIP has a speculative task to run.  We
+   * only launch a speculative task if the current TIP is really
+   * far behind, and has been behind for a non-trivial amount of 
+   * time.
+   */
+  boolean hasSpeculativeTask(double averageProgress) {
+    //
+    // REMIND - mjc - these constants should be examined
+    // in more depth eventually...
+    //
+      
+    if( activeTasks.size() <= MAX_TASK_EXECS &&
+        runSpeculative &&
+        (averageProgress - progress >= SPECULATIVE_GAP) &&
+        (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+        && completes == 0) {
+      return true;
+    }
+    return false;
+  }
+    
+  /**
+   * Return a Task that can be sent to a TaskTracker for execution.
+   */
+  public Task getTaskToRun(String taskTracker) throws IOException {
+    Task t = null;
+    if( 0 == execStartTime ){
+      // assume task starts running now
+      execStartTime = System.currentTimeMillis();
+    }
+
+    // Create the 'taskid'
+    String taskid = null;
+    if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
+      taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
+      ++nextTaskId;
+    } else {
+      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) + 
+               " attempts for the tip '" + getTIPId() + "'");
+      return null;
     }
-    
-    /**
-     * Return a Task that can be sent to a TaskTracker for execution.
-     */
-    public Task getTaskToRun(String taskTracker) throws IOException {
-        Task t = null;
-        if( 0 == execStartTime ){
-          // assume task starts running now
-          execStartTime = System.currentTimeMillis();
-        }
-
-        // Create the 'taskid'
-        String taskid = null;
-        if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
-          taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
-          ++nextTaskId;
-        } else {
-          LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) + 
-                  " attempts for the tip '" + getTIPId() + "'");
-          return null;
-        }
         
-        String jobId = job.getProfile().getJobId();
-
-        if (isMapTask()) {
-          t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
-                          splitClass, split);
-        } else {
-          t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
-        }
-        t.setConf(conf);
+    String jobId = job.getProfile().getJobId();
 
-        activeTasks.put(taskid, taskTracker);
-
-        // Ask JobTracker to note that the task exists
-        jobtracker.createTaskEntry(taskid, taskTracker, this);
-        return t;
-    }
-    
-    /**
-     * Has this task already failed on this machine?
-     * @param tracker The task tracker name
-     * @return Has it failed?
-     */
-    public boolean hasFailedOnMachine(String tracker) {
-      return machinesWhereFailed.contains(tracker);
-    }
-    
-    /**
-     * Was this task ever scheduled to run on this machine?
-     * @param tracker The task tracker name
-     * @return Was task scheduled on the tracker?
-     */
-    public boolean hasRunOnMachine(String tracker){
-      return this.activeTasks.values().contains(tracker) || 
-               hasFailedOnMachine(tracker) ;
-    }
-    /**
-     * Get the number of machines where this task has failed.
-     * @return the size of the failed machine set
-     */
-    public int getNumberOfFailedMachines() {
-      return machinesWhereFailed.size();
-    }
-    
-    /**
-     * Get the id of this map or reduce task.
-     * @return The index of this tip in the maps/reduces lists.
-     */
-    public int getIdWithinJob() {
-      return partition;
-    }
-    
-    /**
-     * Set the event number that was raised for this tip
-     */
-    public void setSuccessEventNumber(int eventNumber) {
-      successEventNumber = eventNumber;
-    }
+    if (isMapTask()) {
+      t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
+                      splitClass, split);
+    } else {
+      t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
+    }
+    t.setConf(conf);
+
+    activeTasks.put(taskid, taskTracker);
+
+    // Ask JobTracker to note that the task exists
+    jobtracker.createTaskEntry(taskid, taskTracker, this);
+    return t;
+  }
+    
+  /**
+   * Has this task already failed on this machine?
+   * @param tracker The task tracker name
+   * @return Has it failed?
+   */
+  public boolean hasFailedOnMachine(String tracker) {
+    return machinesWhereFailed.contains(tracker);
+  }
+    
+  /**
+   * Was this task ever scheduled to run on this machine?
+   * @param tracker The task tracker name
+   * @return Was task scheduled on the tracker?
+   */
+  public boolean hasRunOnMachine(String tracker){
+    return this.activeTasks.values().contains(tracker) || 
+      hasFailedOnMachine(tracker) ;
+  }
+  /**
+   * Get the number of machines where this task has failed.
+   * @return the size of the failed machine set
+   */
+  public int getNumberOfFailedMachines() {
+    return machinesWhereFailed.size();
+  }
+    
+  /**
+   * Get the id of this map or reduce task.
+   * @return The index of this tip in the maps/reduces lists.
+   */
+  public int getIdWithinJob() {
+    return partition;
+  }
+    
+  /**
+   * Set the event number that was raised for this tip
+   */
+  public void setSuccessEventNumber(int eventNumber) {
+    successEventNumber = eventNumber;
+  }
        
-    /**
-     * Get the event number that was raised for this tip
-     */
-    public int getSuccessEventNumber() {
-      return successEventNumber;
-    }
+  /**
+   * Get the event number that was raised for this tip
+   */
+  public int getSuccessEventNumber() {
+    return successEventNumber;
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Apr 16 14:44:35 2007
@@ -110,7 +110,7 @@
      * @param filter the {@link LogFilter} to apply on userlogs.
      */
     Writer(String taskId, LogFilter filter, 
-            int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
+           int noKeepSplits, long totalLogSize, boolean purgeLogSplits, int logsRetainHours) {
       this.taskId = taskId;
       this.filter = filter;
       
@@ -164,10 +164,10 @@
         // Purge logs of tasks on this tasktracker if their  
         // mtime has exceeded "mapred.task.log.retain" hours
         long purgeTimeStamp = System.currentTimeMillis() - 
-                              (logsRetainHours*60*60*1000);
+          (logsRetainHours*60*60*1000);
         File[] oldTaskLogs = LOG_DIR.listFiles(
-                                new TaskLogsPurgeFilter(purgeTimeStamp)
-                              );
+                                               new TaskLogsPurgeFilter(purgeTimeStamp)
+                                               );
         if (oldTaskLogs != null) {
           for (int i=0; i < oldTaskLogs.length; ++i) {
             deleteDir(oldTaskLogs[i]);
@@ -182,8 +182,8 @@
         
         // Create the split index
         splitIndex = new BufferedOutputStream(
-            new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
-            );
+                                              new FileOutputStream(new File(taskLogDir, SPLIT_INDEX_NAME))
+                                              );
 
         out = createLogSplit(noSplits);
         initialized = true;
@@ -199,11 +199,11 @@
      * @throws IOException
      */
     public synchronized void write(byte[] b, int off, int len) 
-    throws IOException {
+      throws IOException {
       // Check if we need to rotate the log
       if (splitLength > splitFileSize) {
         LOG.debug("Total no. of bytes written to split#" + noSplits + 
-            " -> " + splitLength);
+                  " -> " + splitLength);
         logRotate();
       }
       
@@ -238,7 +238,7 @@
     }
 
     private synchronized OutputStream createLogSplit(int split) 
-    throws IOException {
+      throws IOException {
       currentSplit =  getLogSplit(split);
       LOG.debug("About to create the split: " + currentSplit);
       return new BufferedOutputStream(new FileOutputStream(currentSplit));
@@ -246,7 +246,7 @@
     
     private synchronized void writeIndexRecord() throws IOException {
       String indexRecord = new String(currentSplit + "|" + 
-          splitOffset + "|" + splitLength + "\n");
+                                      splitOffset + "|" + splitLength + "\n");
       splitIndex.write(indexRecord.getBytes());
       splitIndex.flush();
     }
@@ -273,7 +273,7 @@
           File purgeLogSplit = getLogSplit((noSplits-noKeepSplits));
           purgeLogSplit.delete();
           LOG.debug("Purged log-split #" + (noSplits-noKeepSplits) + " - " + 
-              purgeLogSplit);
+                    purgeLogSplit);
         }
       }
       
@@ -327,8 +327,8 @@
     
     private synchronized void init() throws IOException {
       this.splitIndex = new BufferedReader(new InputStreamReader(
-                          new FileInputStream(new File(taskLogDir, 
-                                  SPLIT_INDEX_NAME))));
+                                                                 new FileInputStream(new File(taskLogDir, 
+                                                                                              SPLIT_INDEX_NAME))));
 
       // Parse the split-index and store the offsets/lengths
       ArrayList<IndexRecord> records = new ArrayList<IndexRecord>();
@@ -337,16 +337,16 @@
         String[] fields = line.split("\\|");
         if (fields.length != 3) {
           throw new IOException("Malformed split-index with " + 
-              fields.length + " fields");
+                                fields.length + " fields");
         }
         
         IndexRecord record = new IndexRecord(
-                                fields[0], 
-                                Long.valueOf(fields[1]).longValue(), 
-                                Long.valueOf(fields[2]).longValue()
-                              );
+                                             fields[0], 
+                                             Long.valueOf(fields[1]).longValue(), 
+                                             Long.valueOf(fields[2]).longValue()
+                                             );
         LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + 
-            ", " + record.splitLength + ">");
+                  ", " + record.splitLength + ">");
         
         // Save 
         records.add(record);
@@ -398,7 +398,7 @@
         }
       }
       LOG.debug("Total log-size on disk: " + totalLogSize + 
-          "; actual log-size: " + logFileSize);
+                "; actual log-size: " + logFileSize);
 
       // Copy log data into buffer
       byte[] b = new byte[totalLogSize];
@@ -434,8 +434,8 @@
      * @throws IOException
      */
     public synchronized int tail(byte[] b, int off, int len, 
-        long tailSize, int tailWindow) 
-    throws IOException {
+                                 long tailSize, int tailWindow) 
+      throws IOException {
       if (!initialized) {
         init();
       }
@@ -448,7 +448,7 @@
       }
       
       return read(b, off, len, 
-          (long)(logFileSize-(tailSize*tailWindow)), tailSize);
+                  (long)(logFileSize-(tailSize*tailWindow)), tailSize);
     }
 
     /**
@@ -464,8 +464,8 @@
      * @throws IOException
      */
     public synchronized int read(byte[] b, int off, int len, 
-        long logOffset, long logLength) 
-    throws IOException {
+                                 long logOffset, long logLength) 
+      throws IOException {
       LOG.debug("TaskLog.Reader.read: logOffset: " + logOffset + " - logLength: " + logLength);
 
       // Sanity check
@@ -484,7 +484,7 @@
       boolean inRange = false;
       for (int i=0; i < indexRecords.length; ++i) {
         LOG.debug("offset: " + offset + " - (split, splitOffset) : (" + 
-            i + ", " + indexRecords[i].splitOffset + ")");
+                  i + ", " + indexRecords[i].splitOffset + ")");
         
         if (offset <= indexRecords[i].splitOffset) {
           if (!inRange) {
@@ -533,7 +533,7 @@
         long skipBytes = 
           in.skip(logOffset - indexRecords[startIndex].splitOffset);
         LOG.debug("Skipped " + skipBytes + " bytes from " + 
-            startIndex + " stream");
+                  startIndex + " stream");
       }
       int bytesRead = 0, totalBytesRead = 0;
       len = Math.min((int)logLength, len);
@@ -549,7 +549,7 @@
     }
 
     private synchronized InputStream getLogSplit(int split) 
-    throws IOException {
+      throws IOException {
       String splitName = indexRecords[split].splitName;
       LOG.debug("About to open the split: " + splitName);
       InputStream in = null;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Apr 16 14:44:35 2007
@@ -50,16 +50,16 @@
     this.conf = conf;
     this.taskStdOutLogWriter = 
       new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDOUT, 
-              this.conf.getInt("mapred.userlog.num.splits", 4), 
-              this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
-              this.conf.getBoolean("mapred.userlog.purgesplits", true),
-              this.conf.getInt("mapred.userlog.retain.hours", 12));
+                         this.conf.getInt("mapred.userlog.num.splits", 4), 
+                         this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
+                         this.conf.getBoolean("mapred.userlog.purgesplits", true),
+                         this.conf.getInt("mapred.userlog.retain.hours", 12));
     this.taskStdErrLogWriter = 
       new TaskLog.Writer(t.getTaskId(), TaskLog.LogFilter.STDERR, 
-              this.conf.getInt("mapred.userlog.num.splits", 4), 
-              this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
-              this.conf.getBoolean("mapred.userlog.purgesplits", true),
-              this.conf.getInt("mapred.userlog.retain.hours", 12));
+                         this.conf.getInt("mapred.userlog.num.splits", 4), 
+                         this.conf.getInt("mapred.userlog.limit.kb", 100) * 1024, 
+                         this.conf.getBoolean("mapred.userlog.purgesplits", true),
+                         this.conf.getInt("mapred.userlog.retain.hours", 12));
   }
 
   public Task getTask() { return t; }
@@ -75,13 +75,13 @@
   }
 
   /** Called when this task's output is no longer needed.
-  * This method is run in the parent process after the child exits.  It should
-  * not execute user code, only system code.
-  */
+   * This method is run in the parent process after the child exits.  It should
+   * not execute user code, only system code.
+   */
   public void close() throws IOException {}
 
   private String stringifyPathArray(Path[] p){
-	  if (p == null){
+    if (p == null){
       return null;
     }
     String str = p[0].toString();
@@ -106,7 +106,7 @@
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
             p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath()));
+                                                  conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
@@ -114,8 +114,8 @@
           String[] md5 = DistributedCache.getFileMd5(conf);
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
-           p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
-              .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath()));
+            p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
+                                                                                    .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath()));
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
@@ -134,7 +134,7 @@
       // workingdir for streaming
       try{
         DistributedCache.createAllSymlink(conf, jobCacheDir, 
-            workDir);
+                                          workDir);
       } catch(IOException ie){
         // Do not exit even if symlinks have not been created.
         LOG.warn(StringUtils.stringifyException(ie));
@@ -157,7 +157,7 @@
 	  
       String jar = conf.getJar();
       if (jar != null) {       
-    	  // if jar exists, it into workDir
+        // if jar exists, it into workDir
         File[] libs = new File(jobCacheDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
@@ -172,43 +172,43 @@
        
       }
 
-  		// include the user specified classpath
+      // include the user specified classpath
   		
-  		//archive paths
-  		Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-  		if (archiveClasspaths != null && archives != null) {
-  			Path[] localArchives = DistributedCache
-  					.getLocalCacheArchives(conf);
-  			if (localArchives != null){
-  				for (int i=0;i<archives.length;i++){
-  					for(int j=0;j<archiveClasspaths.length;j++){
-  						if(archives[i].getPath().equals(
-  								archiveClasspaths[j].toString())){
-  							classPath.append(sep);
-  							classPath.append(localArchives[i]
-  									.toString());
-  						}
-  					}
-  				}
-  			}
-  		}
-  		//file paths
-  		Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-  		if(fileClasspaths!=null && files != null) {
-  			Path[] localFiles = DistributedCache
-  					.getLocalCacheFiles(conf);
-  			if (localFiles != null) {
-  				for (int i = 0; i < files.length; i++) {
-  					for (int j = 0; j < fileClasspaths.length; j++) {
-  						if (files[i].getPath().equals(
-  								fileClasspaths[j].toString())) {
-  							classPath.append(sep);
-  							classPath.append(localFiles[i].toString());
-  						}
-  					}
-  				}
-  			}
-  		}
+      //archive paths
+      Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
+      if (archiveClasspaths != null && archives != null) {
+        Path[] localArchives = DistributedCache
+          .getLocalCacheArchives(conf);
+        if (localArchives != null){
+          for (int i=0;i<archives.length;i++){
+            for(int j=0;j<archiveClasspaths.length;j++){
+              if(archives[i].getPath().equals(
+                                              archiveClasspaths[j].toString())){
+                classPath.append(sep);
+                classPath.append(localArchives[i]
+                                 .toString());
+              }
+            }
+          }
+        }
+      }
+      //file paths
+      Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
+      if(fileClasspaths!=null && files != null) {
+        Path[] localFiles = DistributedCache
+          .getLocalCacheFiles(conf);
+        if (localFiles != null) {
+          for (int i = 0; i < files.length; i++) {
+            for (int j = 0; j < fileClasspaths.length; j++) {
+              if (files[i].getPath().equals(
+                                            fileClasspaths[j].toString())) {
+                classPath.append(sep);
+                classPath.append(localFiles[i].toString());
+              }
+            }
+          }
+        }
+      }
 
       classPath.append(sep);
       classPath.append(workDir);
@@ -237,48 +237,48 @@
       //
       //     <name>mapred.child.optional.jvm.args</name>
       //     <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
-      //     -Dcom.sun.management.jmxremote.authenticate=false \
-      //     -Dcom.sun.management.jmxremote.ssl=false \
-      //     -Dcom.sun.management.jmxremote.port=@port@
-      //     </value>
-      //
-      String javaOpts = handleDeprecatedHeapSize(
-          conf.get("mapred.child.java.opts", "-Xmx200m"),
-          conf.get("mapred.child.heap.size"));
-      javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
-      int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
-      javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
-      String [] javaOptsSplit = javaOpts.split(" ");
-      for (int i = 0; i < javaOptsSplit.length; i++) {
-         vargs.add(javaOptsSplit[i]);
-      }
-
-      // Add classpath.
-      vargs.add("-classpath");
-      vargs.add(classPath.toString());
-
-      // Setup the log4j prop
-      vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir"));
-      vargs.add("-Dhadoop.root.logger=INFO,TLA");
-      vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId());
-      vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4)); 
-      vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024));
-      vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true));
-      vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12)); 
-
-      // Add java.library.path; necessary for native-hadoop libraries
-      String libraryPath = System.getProperty("java.library.path");
-      if (libraryPath != null) {
+        //     -Dcom.sun.management.jmxremote.authenticate=false \
+        //     -Dcom.sun.management.jmxremote.ssl=false \
+        //     -Dcom.sun.management.jmxremote.port=@port@
+        //     </value>
+        //
+        String javaOpts = handleDeprecatedHeapSize(
+                                                   conf.get("mapred.child.java.opts", "-Xmx200m"),
+                                                   conf.get("mapred.child.heap.size"));
+        javaOpts = replaceAll(javaOpts, "@taskid@", t.getTaskId());
+        int port = conf.getInt("mapred.task.tracker.report.port", 50050) + 1;
+        javaOpts = replaceAll(javaOpts, "@port@", Integer.toString(port));
+        String [] javaOptsSplit = javaOpts.split(" ");
+        for (int i = 0; i < javaOptsSplit.length; i++) {
+          vargs.add(javaOptsSplit[i]);
+        }
+
+        // Add classpath.
+        vargs.add("-classpath");
+        vargs.add(classPath.toString());
+
+        // Setup the log4j prop
+        vargs.add("-Dhadoop.log.dir=" + System.getProperty("hadoop.log.dir"));
+        vargs.add("-Dhadoop.root.logger=INFO,TLA");
+        vargs.add("-Dhadoop.tasklog.taskid=" + t.getTaskId());
+        vargs.add("-Dhadoop.tasklog.noKeepSplits=" + conf.getInt("mapred.userlog.num.splits", 4)); 
+        vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + (conf.getInt("mapred.userlog.limit.kb", 100) * 1024));
+        vargs.add("-Dhadoop.tasklog.purgeLogSplits=" + conf.getBoolean("mapred.userlog.purgesplits", true));
+        vargs.add("-Dhadoop.tasklog.logsRetainHours=" + conf.getInt("mapred.userlog.retain.hours", 12)); 
+
+        // Add java.library.path; necessary for native-hadoop libraries
+        String libraryPath = System.getProperty("java.library.path");
+        if (libraryPath != null) {
           vargs.add("-Djava.library.path=" + libraryPath);
-      }
+        }
 
-      // Add main class and its arguments 
-      vargs.add(TaskTracker.Child.class.getName());  // main of Child
-      vargs.add(tracker.taskReportPort + "");        // pass umbilical port
-      vargs.add(t.getTaskId());                      // pass task identifier
+        // Add main class and its arguments 
+        vargs.add(TaskTracker.Child.class.getName());  // main of Child
+        vargs.add(tracker.taskReportPort + "");        // pass umbilical port
+        vargs.add(t.getTaskId());                      // pass task identifier
 
-      // Run java
-      runChild((String[])vargs.toArray(new String[0]), workDir);
+        // Run java
+        runChild((String[])vargs.toArray(new String[0]), workDir);
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
@@ -327,22 +327,22 @@
    * interpolated if present.
    */
   private String handleDeprecatedHeapSize(String javaOpts,
-          final String heapSize) {
+                                          final String heapSize) {
     if (heapSize == null || heapSize.length() <= 0) {
-        return javaOpts;
+      return javaOpts;
     }
     final String MX = "-Xmx";
     int index = javaOpts.indexOf(MX);
     if (index < 0) {
-        javaOpts = javaOpts + " " + MX + heapSize;
+      javaOpts = javaOpts + " " + MX + heapSize;
     } else {
-        int end = javaOpts.indexOf(" ", index + MX.length());
-        javaOpts = javaOpts.substring(0, index + MX.length()) +
-            heapSize + ((end < 0)? "": javaOpts.substring(end));
+      int end = javaOpts.indexOf(" ", index + MX.length());
+      javaOpts = javaOpts.substring(0, index + MX.length()) +
+        heapSize + ((end < 0)? "": javaOpts.substring(end));
     }
     LOG.warn("mapred.child.heap.size is deprecated. Use " +
-        "mapred.child.java.opt instead. Meantime, mapred.child.heap.size " +
-        "is interpolated into mapred.child.java.opt: " + javaOpts);
+             "mapred.child.java.opt instead. Meantime, mapred.child.heap.size " +
+             "is interpolated into mapred.child.java.opt: " + javaOpts);
     return javaOpts;
   }
 
@@ -360,17 +360,17 @@
    * found in <code>text<code>).
    */
   private static String replaceAll(String text, final String toFind,
-      final String replacement) {
+                                   final String replacement) {
     if (text ==  null || toFind ==  null || replacement ==  null) {
       throw new IllegalArgumentException("Text " + text + " or toFind " +
-        toFind + " or replacement " + replacement + " are null.");
+                                         toFind + " or replacement " + replacement + " are null.");
     }
     int offset = 0;
     for (int index = text.indexOf(toFind); index >= 0;
-          index = text.indexOf(toFind, offset)) {
+         index = text.indexOf(toFind, offset)) {
       offset = index + toFind.length();
       text = text.substring(0, index) + replacement +
-          text.substring(offset);
+        text.substring(offset);
         
     }
     return text;
@@ -412,10 +412,10 @@
    * Kill the child process
    */
   public void kill() {
-      if (process != null) {
-          process.destroy();
-      }
-      killed = true;
+    if (process != null) {
+      process.destroy();
+    }
+    killed = true;
   }
 
   /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Mon Apr 16 14:44:35 2007
@@ -31,207 +31,207 @@
  * @author Mike Cafarella
  **************************************************/
 class TaskStatus implements Writable {
-    //enumeration for reporting current phase of a task. 
-    public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+  //enumeration for reporting current phase of a task. 
+  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
 
-    // what state is the task in?
-    public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
+  // what state is the task in?
+  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
     
-    private String taskid;
-    private boolean isMap;
-    private float progress;
-    private State runState;
-    private String diagnosticInfo;
-    private String stateString;
-    private String taskTracker;
-    
-    private long startTime ; 
-    private long finishTime ; 
-    
-    // only for reduce tasks
-    private long shuffleFinishTime ; 
-    private long sortFinishTime ; 
-    
-    private Phase phase = Phase.STARTING; 
-    private Counters counters;
-
-    public TaskStatus() {}
-
-    public TaskStatus(String taskid, boolean isMap, float progress,
-                      State runState, String diagnosticInfo,
-                      String stateString, String taskTracker,
-                      Phase phase, Counters counters) {
-        this.taskid = taskid;
-        this.isMap = isMap;
-        this.progress = progress;
-        this.runState = runState;
-        this.diagnosticInfo = diagnosticInfo;
-        this.stateString = stateString;
-        this.taskTracker = taskTracker;
-        this.phase = phase ;
-        this.counters = counters;
-    }
-    
-    public String getTaskId() { return taskid; }
-    public boolean getIsMap() { return isMap; }
-    public float getProgress() { return progress; }
-    public void setProgress(float progress) { this.progress = progress; } 
-    public State getRunState() { return runState; }
-    public String getTaskTracker() {return taskTracker;}
-    public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
-    public void setRunState(State runState) { this.runState = runState; }
-    public String getDiagnosticInfo() { return diagnosticInfo; }
-    public void setDiagnosticInfo(String info) { this.diagnosticInfo = info; }
-    public String getStateString() { return stateString; }
-    public void setStateString(String stateString) { this.stateString = stateString; }
-    /**
-     * Get task finish time. if shuffleFinishTime and sortFinishTime 
-     * are not set before, these are set to finishTime. It takes care of 
-     * the case when shuffle, sort and finish are completed with in the 
-     * heartbeat interval and are not reported separately. if task state is 
-     * TaskStatus.FAILED then finish time represents when the task failed.
-     * @return finish time of the task. 
-     */
-    public long getFinishTime() {
-      return finishTime;
-    }
-
-    /**
-     * Sets finishTime. 
-     * @param finishTime finish time of task.
-     */
-    void setFinishTime(long finishTime) {
-      if( shuffleFinishTime == 0 ) {
-        this.shuffleFinishTime = finishTime ; 
-      }
-      if( sortFinishTime == 0 ){
-        this.sortFinishTime = finishTime ;
-      }
-      this.finishTime = finishTime;
-    }
-    /**
-     * Get shuffle finish time for the task. If shuffle finish time was 
-     * not set due to shuffle/sort/finish phases ending within same
-     * heartbeat interval, it is set to finish time of next phase i.e. sort 
-     * or task finish when these are set.  
-     * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else 
-     * it returns approximate shuffle finish time.  
-     */
-    public long getShuffleFinishTime() {
-      return shuffleFinishTime;
-    }
-
-    /**
-     * Set shuffle finish time. 
-     * @param shuffleFinishTime 
-     */
-    void setShuffleFinishTime(long shuffleFinishTime) {
-      this.shuffleFinishTime = shuffleFinishTime;
-    }
-
-    /**
-     * Get sort finish time for the task,. If sort finish time was not set 
-     * due to sort and reduce phase finishing in same heartebat interval, it is 
-     * set to finish time, when finish time is set. 
-     * @return 0 if sort finish time and finish time are not set, else returns sort
-     * finish time if that is set, else it returns finish time. 
-     */
-    public long getSortFinishTime() {
-      return sortFinishTime;
-    }
-
-    /**
-     * Sets sortFinishTime, if shuffleFinishTime is not set before 
-     * then its set to sortFinishTime.  
-     * @param sortFinishTime
-     */
-    void setSortFinishTime(long sortFinishTime) {
-      this.sortFinishTime = sortFinishTime;
-      if( 0 == this.shuffleFinishTime){
-        this.shuffleFinishTime = sortFinishTime ;
-      }
-    }
-
-    /**
-     * Get start time of the task. 
-     * @return 0 is start time is not set, else returns start time. 
-     */
-    public long getStartTime() {
-      return startTime;
-    }
-
-    /**
-     * Set startTime of the task.
-     * @param startTime start time
-     */
-    void setStartTime(long startTime) {
-      this.startTime = startTime;
-    }
-    /**
-     * Get current phase of this task. Phase.Map in case of map tasks, 
-     * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE. 
-     * @return . 
-     */
-    public Phase getPhase(){
-      return this.phase; 
-    }
-    /**
-     * Set current phase of this task.  
-     * @param p
-     */
-    void setPhase(Phase p){
-      this.phase = p ; 
-    }
-    /**
-     * Get task's counters.
-     */
-    public Counters getCounters() {
-      return counters;
-    }
-    /**
-     * Set the task's counters.
-     * @param counters
-     */
-    public void setCounters(Counters counters) {
-      this.counters = counters;
-    }
-    
-    //////////////////////////////////////////////
-    // Writable
-    //////////////////////////////////////////////
-    public void write(DataOutput out) throws IOException {
-        UTF8.writeString(out, taskid);
-        out.writeBoolean(isMap);
-        out.writeFloat(progress);
-        WritableUtils.writeEnum(out, runState);
-        UTF8.writeString(out, diagnosticInfo);
-        UTF8.writeString(out, stateString);
-        WritableUtils.writeEnum(out, phase);
-        out.writeLong(startTime);
-        out.writeLong(finishTime);
-        if(! isMap){
-          out.writeLong(shuffleFinishTime);
-          out.writeLong(sortFinishTime);
-        }
-        counters.write(out);
-    }
-
-    public void readFields(DataInput in) throws IOException {
-        this.taskid = UTF8.readString(in);
-        this.isMap = in.readBoolean();
-        this.progress = in.readFloat();
-        this.runState = WritableUtils.readEnum(in, State.class);
-        this.diagnosticInfo = UTF8.readString(in);
-        this.stateString = UTF8.readString(in);
-        this.phase = WritableUtils.readEnum(in, Phase.class); 
-        this.startTime = in.readLong(); 
-        this.finishTime = in.readLong() ; 
-        if( ! this.isMap ){
-          shuffleFinishTime = in.readLong(); 
-          sortFinishTime = in.readLong(); 
-        }
-        counters = new Counters();
-        counters.readFields(in);
-     }
+  private String taskid;
+  private boolean isMap;
+  private float progress;
+  private State runState;
+  private String diagnosticInfo;
+  private String stateString;
+  private String taskTracker;
+    
+  private long startTime ; 
+  private long finishTime ; 
+    
+  // only for reduce tasks
+  private long shuffleFinishTime ; 
+  private long sortFinishTime ; 
+    
+  private Phase phase = Phase.STARTING; 
+  private Counters counters;
+
+  public TaskStatus() {}
+
+  public TaskStatus(String taskid, boolean isMap, float progress,
+                    State runState, String diagnosticInfo,
+                    String stateString, String taskTracker,
+                    Phase phase, Counters counters) {
+    this.taskid = taskid;
+    this.isMap = isMap;
+    this.progress = progress;
+    this.runState = runState;
+    this.diagnosticInfo = diagnosticInfo;
+    this.stateString = stateString;
+    this.taskTracker = taskTracker;
+    this.phase = phase ;
+    this.counters = counters;
+  }
+    
+  public String getTaskId() { return taskid; }
+  public boolean getIsMap() { return isMap; }
+  public float getProgress() { return progress; }
+  public void setProgress(float progress) { this.progress = progress; } 
+  public State getRunState() { return runState; }
+  public String getTaskTracker() {return taskTracker;}
+  public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
+  public void setRunState(State runState) { this.runState = runState; }
+  public String getDiagnosticInfo() { return diagnosticInfo; }
+  public void setDiagnosticInfo(String info) { this.diagnosticInfo = info; }
+  public String getStateString() { return stateString; }
+  public void setStateString(String stateString) { this.stateString = stateString; }
+  /**
+   * Get task finish time. if shuffleFinishTime and sortFinishTime 
+   * are not set before, these are set to finishTime. It takes care of 
+   * the case when shuffle, sort and finish are completed with in the 
+   * heartbeat interval and are not reported separately. if task state is 
+   * TaskStatus.FAILED then finish time represents when the task failed.
+   * @return finish time of the task. 
+   */
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  /**
+   * Sets finishTime. 
+   * @param finishTime finish time of task.
+   */
+  void setFinishTime(long finishTime) {
+    if( shuffleFinishTime == 0 ) {
+      this.shuffleFinishTime = finishTime ; 
+    }
+    if( sortFinishTime == 0 ){
+      this.sortFinishTime = finishTime ;
+    }
+    this.finishTime = finishTime;
+  }
+  /**
+   * Get shuffle finish time for the task. If shuffle finish time was 
+   * not set due to shuffle/sort/finish phases ending within same
+   * heartbeat interval, it is set to finish time of next phase i.e. sort 
+   * or task finish when these are set.  
+   * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else 
+   * it returns approximate shuffle finish time.  
+   */
+  public long getShuffleFinishTime() {
+    return shuffleFinishTime;
+  }
+
+  /**
+   * Set shuffle finish time. 
+   * @param shuffleFinishTime 
+   */
+  void setShuffleFinishTime(long shuffleFinishTime) {
+    this.shuffleFinishTime = shuffleFinishTime;
+  }
+
+  /**
+   * Get sort finish time for the task,. If sort finish time was not set 
+   * due to sort and reduce phase finishing in same heartebat interval, it is 
+   * set to finish time, when finish time is set. 
+   * @return 0 if sort finish time and finish time are not set, else returns sort
+   * finish time if that is set, else it returns finish time. 
+   */
+  public long getSortFinishTime() {
+    return sortFinishTime;
+  }
+
+  /**
+   * Sets sortFinishTime, if shuffleFinishTime is not set before 
+   * then its set to sortFinishTime.  
+   * @param sortFinishTime
+   */
+  void setSortFinishTime(long sortFinishTime) {
+    this.sortFinishTime = sortFinishTime;
+    if( 0 == this.shuffleFinishTime){
+      this.shuffleFinishTime = sortFinishTime ;
+    }
+  }
+
+  /**
+   * Get start time of the task. 
+   * @return 0 is start time is not set, else returns start time. 
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Set startTime of the task.
+   * @param startTime start time
+   */
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+  /**
+   * Get current phase of this task. Phase.Map in case of map tasks, 
+   * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE. 
+   * @return . 
+   */
+  public Phase getPhase(){
+    return this.phase; 
+  }
+  /**
+   * Set current phase of this task.  
+   * @param p
+   */
+  void setPhase(Phase p){
+    this.phase = p ; 
+  }
+  /**
+   * Get task's counters.
+   */
+  public Counters getCounters() {
+    return counters;
+  }
+  /**
+   * Set the task's counters.
+   * @param counters
+   */
+  public void setCounters(Counters counters) {
+    this.counters = counters;
+  }
+    
+  //////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    UTF8.writeString(out, taskid);
+    out.writeBoolean(isMap);
+    out.writeFloat(progress);
+    WritableUtils.writeEnum(out, runState);
+    UTF8.writeString(out, diagnosticInfo);
+    UTF8.writeString(out, stateString);
+    WritableUtils.writeEnum(out, phase);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    if(! isMap){
+      out.writeLong(shuffleFinishTime);
+      out.writeLong(sortFinishTime);
+    }
+    counters.write(out);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.taskid = UTF8.readString(in);
+    this.isMap = in.readBoolean();
+    this.progress = in.readFloat();
+    this.runState = WritableUtils.readEnum(in, State.class);
+    this.diagnosticInfo = UTF8.readString(in);
+    this.stateString = UTF8.readString(in);
+    this.phase = WritableUtils.readEnum(in, Phase.class); 
+    this.startTime = in.readLong(); 
+    this.finishTime = in.readLong() ; 
+    if( ! this.isMap ){
+      shuffleFinishTime = in.readLong(); 
+      sortFinishTime = in.readLong(); 
+    }
+    counters = new Counters();
+    counters.readFields(in);
+  }
 }
 



Mime
View raw message