hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077167 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ contrib/fairscheduler/src/test/org/apache/hadoop/...
Date Fri, 04 Mar 2011 03:48:03 GMT
Author: omalley
Date: Fri Mar  4 03:48:03 2011
New Revision: 1077167

URL: http://svn.apache.org/viewvc?rev=1077167&view=rev
Log:
commit db8f1bf2ddc04574c8a7a1b9844ddfdacafcfa4c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Sun Feb 14 14:50:27 2010 +0530

    HADOOP:2141 from https://issues.apache.org/jira/secure/attachment/12435253/hadoop-2141-yahoo-v1.4.8.patch
(only test related changes)
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-2141. Backport changes made in the original JIRA to aid
    +    fast unit tests in Map/Reduce. (Amar Kamat via yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Mar  4 03:48:03 2011
@@ -338,7 +338,8 @@ public class TestCapacityScheduler exten
     
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
-      super(jId, "", JobSplit.EMPTY_TASK_SPLIT, null, jobConf, job, 0, 1);
+      super(jId, "", JobSplit.EMPTY_TASK_SPLIT, job.jobtracker, jobConf, job, 
+            0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Fri Mar  4 03:48:03 2011
@@ -87,15 +87,6 @@ public class FairScheduler extends TaskS
     double reduceFairShare = 0; // Fair share of reduce slots at last update
   }
   
-  /**
-   * A clock class - can be mocked out for testing.
-   */
-  static class Clock {
-    long getTime() {
-      return System.currentTimeMillis();
-    }
-  }
-  
   public FairScheduler() {
     this(new Clock(), true);
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Fri Mar  4 03:48:03 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.JobStatu
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -236,19 +237,6 @@ public class TestFairScheduler extends T
     }
   }
   
-  protected class FakeClock extends FairScheduler.Clock {
-    private long time = 0;
-    
-    public void advance(long millis) {
-      time += millis;
-    }
-
-    @Override
-    long getTime() {
-      return time;
-    }
-  }
-  
   protected JobConf conf;
   protected FairScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java?rev=1077167&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
(added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
Fri Mar  4 03:48:03 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+class Clock {
+  long getTime() {
+    return System.currentTimeMillis();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
Fri Mar  4 03:48:03 2011
@@ -111,6 +111,15 @@ public class ClusterStatus implements Wr
   }
 
   /**
+   * Construct a new cluster status.
+   * @param trackers no. of tasktrackers in the cluster
+   * @param blacklists no of blacklisted task trackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionedNodes number of decommission trackers
    */
   ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
@@ -151,6 +160,15 @@ public class ClusterStatus implements Wr
   }
 
   /**
+   * Construct a new cluster status. 
+   * @param activeTrackers active tasktrackers in the cluster
+   * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionNodes number of decommission trackers
    */
   ClusterStatus(Collection<String> activeTrackers, 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:48:03 2011
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -286,6 +288,17 @@ class JobInProgress {
     this.anyCacheLevel = this.maxLevel+1;
     this.jobtracker = tracker;
     this.restartCount = 0;
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+      (numMapTasks + numReduceTasks + 10);
     try {
       this.userUGI = UserGroupInformation.getCurrentUser();
     } catch (IOException ie){
@@ -319,7 +332,7 @@ class JobInProgress {
     this.jobtracker = jobtracker;
     this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
     this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
-    this.startTime = System.currentTimeMillis();
+    this.startTime = jobtracker.getClock().getTime();
     status.setStartTime(startTime);
     this.localFs = jobtracker.getLocalFileSystem();
 
@@ -430,7 +443,7 @@ class JobInProgress {
     
     for (int i = 0; i < splits.length; i++) {
       String[] splitLocations = splits[i].getLocations();
-      if (splitLocations.length == 0) {
+      if (splitLocations == null || splitLocations.length == 0) {
         nonLocalMaps.add(maps[i]);
         continue;
       }
@@ -588,7 +601,7 @@ class JobInProgress {
     }
         
     // set the launch time
-    this.launchTime = System.currentTimeMillis();
+    this.launchTime = jobtracker.getClock().getTime();
 
     //
     // Create reduce tasks
@@ -1586,7 +1599,7 @@ class JobInProgress {
     Map<TaskTracker, FallowSlotInfo> map =
       (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
     
-    long now = System.currentTimeMillis();
+    long now = jobtracker.getClock().getTime();
     
     FallowSlotInfo info = map.get(taskTracker);
     int reservedSlots = 0;
@@ -1632,7 +1645,7 @@ class JobInProgress {
       return;
     }
     
-    long now = System.currentTimeMillis();
+    long now = jobtracker.getClock().getTime();
 
     Enum<Counter> counter = 
       (type == TaskType.MAP) ? 
@@ -1720,7 +1733,7 @@ class JobInProgress {
     String[] splitLocations = tip.getSplitLocations();
 
     // Remove the TIP from the list for running non-local maps
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.remove(tip);
       return;
     }
@@ -1760,7 +1773,7 @@ class JobInProgress {
    * Adds a map tip to the list of running maps.
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleMap(TaskInProgress tip) {
+  protected synchronized void scheduleMap(TaskInProgress tip) {
     
     if (runningMapCache == null) {
       LOG.warn("Running cache for maps is missing!! " 
@@ -1770,7 +1783,7 @@ class JobInProgress {
     String[] splitLocations = tip.getSplitLocations();
 
     // Add the TIP to the list of non-local running TIPs
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.add(tip);
       return;
     }
@@ -1795,7 +1808,7 @@ class JobInProgress {
    * Adds a reduce tip to the list of running reduces
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleReduce(TaskInProgress tip) {
+  protected synchronized void scheduleReduce(TaskInProgress tip) {
     if (runningReduces == null) {
       LOG.warn("Running cache for reducers missing!! "
                + "Job details are missing.");
@@ -1822,7 +1835,7 @@ class JobInProgress {
     String[] splitLocations = tip.getSplitLocations();
 
     // Add the TIP in the front of the list for non-local non-running maps
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalMaps.add(0, tip);
       return;
     }
@@ -2106,7 +2119,7 @@ class JobInProgress {
     // 
  
     if (hasSpeculativeMaps) {
-      long currentTime = System.currentTimeMillis();
+      long currentTime = jobtracker.getClock().getTime();
 
       // 1. Check bottom up for speculative tasks from the running cache
       if (node != null) {
@@ -2214,7 +2227,7 @@ class JobInProgress {
     // 2. check for a reduce tip to be speculated
     if (hasSpeculativeReduces) {
       tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
-                                System.currentTimeMillis(), false);
+                                jobtracker.getClock().getTime(), false);
       if (tip != null) {
         scheduleReduce(tip);
         return tip.getIdWithinJob();
@@ -2435,7 +2448,7 @@ class JobInProgress {
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = jobtracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       
@@ -2460,7 +2473,7 @@ class JobInProgress {
   private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = jobtracker.getClock().getTime();
       this.status.setMapProgress(1.0f);
       this.status.setReduceProgress(1.0f);
       this.status.setCleanupProgress(1.0f);
@@ -2848,10 +2861,10 @@ class JobInProgress {
     // update the actual start-time of the attempt
     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
     long startTime = oldStatus == null
-                     ? System.currentTimeMillis()
+                     ? jobtracker.getClock().getTime()
                      : oldStatus.getStartTime();
     status.setStartTime(startTime);
-    status.setFinishTime(System.currentTimeMillis());
+    status.setFinishTime(jobtracker.getClock().getTime());
     boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:48:03 2011
@@ -191,6 +191,8 @@ public class JobTracker implements MRCon
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private Clock clock;
 
   private TokenStorage tokenStorage;
   private final JobTokenSecretManager jobTokenSecretManager
@@ -225,6 +227,10 @@ public class JobTracker implements MRCon
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
+  
+  public Clock getClock() {
+    return clock;
+  }
     
   /**
    * Start the JobTracker with given configuration.
@@ -314,7 +320,7 @@ public class JobTracker implements MRCon
         try {
           // Every 3 minutes check for any tasks that are overdue
           Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           LOG.debug("Starting launching task sweep");
           synchronized (JobTracker.this) {
             synchronized (launchingTasks) {
@@ -368,7 +374,7 @@ public class JobTracker implements MRCon
     public void addNewTask(TaskAttemptID taskName) {
       synchronized (launchingTasks) {
         launchingTasks.put(taskName, 
-                           System.currentTimeMillis());
+                           clock.getTime());
       }
     }
       
@@ -412,7 +418,7 @@ public class JobTracker implements MRCon
           synchronized (JobTracker.this) {
             synchronized (taskTrackers) {
               synchronized (trackerExpiryQueue) {
-                long now = System.currentTimeMillis();
+                long now = clock.getTime();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
                        (leastRecent = trackerExpiryQueue.first()) != null &&
@@ -548,7 +554,7 @@ public class JobTracker implements MRCon
         try {
           Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           long retireBefore = now - RETIRE_JOB_INTERVAL;
 
           synchronized (jobs) {
@@ -636,9 +642,9 @@ public class JobTracker implements MRCon
     private boolean isHealthy;
     private HashMap<ReasonForBlackListing, String>rfbMap;
     
-    FaultInfo() {
+    FaultInfo(long time) {
       numFaults = 0;
-      lastUpdated = System.currentTimeMillis();
+      lastUpdated = time;
       blacklisted = false;
       rfbMap = new  HashMap<ReasonForBlackListing, String>();
     }
@@ -730,7 +736,7 @@ public class JobTracker implements MRCon
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
-        fi.setLastUpdated(System.currentTimeMillis());
+        fi.setLastUpdated(clock.getTime());
         if (exceedsFaults(fi)) {
           LOG.info("Adding " + hostName + " to the blacklist"
               + " across all jobs");
@@ -814,7 +820,7 @@ public class JobTracker implements MRCon
         boolean createIfNeccessary) {
       FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
       if (fi == null && createIfNeccessary) {
-        fi = new FaultInfo();
+        fi = new FaultInfo(clock.getTime());
         potentiallyFaultyTrackers.put(hostName, fi);
       }
       return fi;
@@ -1362,7 +1368,7 @@ public class JobTracker implements MRCon
       TaskTrackerStatus ttStatus = 
         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
                               0 , 0, 0);
-      ttStatus.setLastSeen(System.currentTimeMillis());
+      ttStatus.setLastSeen(clock.getTime());
 
       synchronized (JobTracker.this) {
         synchronized (taskTrackers) {
@@ -1688,7 +1694,7 @@ public class JobTracker implements MRCon
         }
       }
 
-      long recoveryStartTime = System.currentTimeMillis();
+      long recoveryStartTime = clock.getTime();
 
       // II. Recover each job
       idIter = jobsToRecover.iterator();
@@ -1745,14 +1751,14 @@ public class JobTracker implements MRCon
         }
       }
 
-      recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
+      recoveryDuration = clock.getTime() - recoveryStartTime;
       hasRecovered = true;
 
       // III. Finalize the recovery
       synchronized (trackerExpiryQueue) {
         // Make sure that the tracker statuses in the expiry-tracker queue
         // are updated
-        long now = System.currentTimeMillis();
+        long now = clock.getTime();
         int size = trackerExpiryQueue.size();
         for (int i = 0; i < size ; ++i) {
           // Get the first tasktracker
@@ -1947,6 +1953,12 @@ public class JobTracker implements MRCon
   
   JobTracker(final JobConf conf, String identifier) 
   throws IOException, InterruptedException {   
+    this(conf, identifier, new Clock());
+  }
+  
+  JobTracker(final JobConf conf, String identifier, Clock clock) 
+  throws IOException, InterruptedException { 
+    this.clock = clock;
     // find the owner of the process
     // get the desired principal to load
     String keytabFilename = conf.get(JT_KEYTAB_FILE);
@@ -2064,7 +2076,7 @@ public class JobTracker implements MRCon
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
     String infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    this.startTime = System.currentTimeMillis();
+    this.startTime = clock.getTime();
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
@@ -2598,7 +2610,7 @@ public class JobTracker implements MRCon
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
@@ -2979,7 +2991,7 @@ public class JobTracker implements MRCon
 
     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     boolean isBlacklisted = false;
     if (restarted) {
       faultyTrackers.markTrackerHealthy(status.getHost());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Fri Mar  4 03:48:03 2011
@@ -226,7 +226,7 @@ class TaskInProgress {
    * Initialization common to Map and Reduce
    */
   void init(JobID jobId) {
-    this.startTime = System.currentTimeMillis();
+    this.startTime = jobtracker.getClock().getTime();
     this.id = new TaskID(jobId, isMapTask(), partition);
     this.skipping = startSkipping();
   }
@@ -635,7 +635,7 @@ class TaskInProgress {
 
       // tasktracker went down and failed time was not reported. 
       if (0 == status.getFinishTime()){
-        status.setFinishTime(System.currentTimeMillis());
+        status.setFinishTime(jobtracker.getClock().getTime());
       }
     }
 
@@ -740,7 +740,7 @@ class TaskInProgress {
     //
 
     this.completes++;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
     
   }
@@ -779,7 +779,7 @@ class TaskInProgress {
     }
     this.failed = true;
     killed = true;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
   }
 
@@ -903,7 +903,7 @@ class TaskInProgress {
   public Task getTaskToRun(String taskTracker) throws IOException {
     if (0 == execStartTime){
       // assume task starts running now
-      execStartTime = System.currentTimeMillis();
+      execStartTime = jobtracker.getClock().getTime();
     }
 
     // Create the 'taskid'; do not count the 'killed' tasks against the job!

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml Fri
Mar  4 03:48:03 2011
@@ -89,4 +89,16 @@
        <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
        <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
      </Match>
+    <!--
+       JobTracker's static variables should be ignored
+     -->
+     <Match>
+       <Class name="org.apache.hadoop.mapred.JobTracker" />
+       <Or>
+       <Field name="RETIRE_JOB_INTERVAL" />
+       <Field name="TASKTRACKER_EXPIRY_INTERVAL" />
+       <Field name="RETIRE_JOB_CHECK_INTERVAL" />
+       </Or>
+       <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
+     </Match>
 </FindBugsFilter>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Fri Mar  4 03:48:03 2011
@@ -48,7 +48,7 @@ public class TestResourceEstimation exte
       JobSplit.TaskSplitMetaInfo split =
           new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
       TaskInProgress tip = 
-        new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+        new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
     assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
@@ -86,7 +86,7 @@ public class TestResourceEstimation exte
               new JobSplit.TaskSplitMetaInfo(new String[0], 0,
                                            singleMapInputSize);
       TaskInProgress tip = 
-        new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+        new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
       re.updateWithCompletedTask(ts, tip);
     }
     
@@ -99,7 +99,7 @@ public class TestResourceEstimation exte
     JobSplit.TaskSplitMetaInfo split =
         new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
     TaskInProgress tip = 
-      new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+      new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
     re.updateWithCompletedTask(ts, tip);
     
     long expectedTotalMapOutSize = (singleMapOutputSize*11) * 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Fri Mar  4 03:48:03 2011
@@ -30,7 +30,6 @@ import java.util.Properties;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -678,6 +677,19 @@ public class UtilsForTests {
     return job;
   }
 
+  static class FakeClock extends Clock {
+    long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
+     
+      @Override
+      long getTime() {
+        return time;
+      }
+    }
+  
   // Mapper that fails
   static class FailMapper extends MapReduceBase implements
       Mapper<WritableComparable, Writable, WritableComparable, Writable> {



Mime
View raw message