hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r818675 [4/4] - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/ src/contrib/mumak/ src/contrib/mumak/bin/ src/contrib/mumak/conf/ src/contrib/mumak/ivy/ src/contrib/mumak/src/ src/contrib/mumak/src/java/ src/contrib/mumak/src/jav...
Date Fri, 25 Sep 2009 00:27:59 GMT
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=818675&r1=818674&r2=818675&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobInProgress.java
Fri Sep 25 00:27:57 2009
@@ -116,7 +116,7 @@
   int failedMapTasks = 0; 
   int failedReduceTasks = 0;
   
-  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
+  static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
   int completedMapsForReduceSlowstart = 0;
   
   // runningMapTasks include speculative tasks, so we need to capture 
@@ -170,7 +170,7 @@
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any available map tasks for this job, including speculative tasks.
    */
-  private int anyCacheLevel;
+  int anyCacheLevel;
   
   /**
    * A special value indicating that 
@@ -199,7 +199,7 @@
     new TreeMap<String, Integer>();
     
   //Confine estimation algorithms to an "oracle" class that JIP queries.
-  private ResourceEstimator resourceEstimator; 
+  ResourceEstimator resourceEstimator; 
   
   long startTime;
   long launchTime;
@@ -208,20 +208,20 @@
   // Indicates how many times the job got restarted
   private final int restartCount;
 
-  private JobConf conf;
+  JobConf conf;
   protected AtomicBoolean tasksInited = new AtomicBoolean(false);
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
-  private LocalFileSystem localFs;
-  private FileSystem fs;
-  private JobID jobId;
+  LocalFileSystem localFs;
+  FileSystem fs;
+  JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
-  private long inputLength = 0;
+  long inputLength = 0;
   
-  private Counters jobCounters = new Counters();
+  Counters jobCounters = new Counters();
   
-  private MetricsRecord jobMetrics;
+  MetricsRecord jobMetrics;
   
   // Maximum no. of fetch-failure notifications after which map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
@@ -238,9 +238,9 @@
   private Object schedulingInfo;
 
   //thresholds for speculative execution
-  private float slowTaskThreshold;
-  private float speculativeCap;
-  private float slowNodeThreshold; //standard deviations
+  float slowTaskThreshold;
+  float speculativeCap;
+  float slowNodeThreshold; //standard deviations
 
   //Statistics are maintained for a couple of things
   //mapTaskStats is used for maintaining statistics about
@@ -338,6 +338,11 @@
     }
   }
   
+  JobInProgress() {
+    restartCount = 0;
+    jobSetupCleanupNeeded = false;
+  }
+  
   /**
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
@@ -739,7 +744,7 @@
     setup[1].setJobSetupTask();
   }
   
-  private void setupComplete() {
+  void setupComplete() {
     status.setSetupProgress(1.0f);
     if (this.status.getRunState() == JobStatus.PREP) {
       this.status.setRunState(JobStatus.RUNNING);

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=818675&r1=818674&r2=818675&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
Fri Sep 25 00:27:57 2009
@@ -139,10 +139,10 @@
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private DNSToSwitchMapping dnsToSwitchMapping;
-  private NetworkTopology clusterMap = new NetworkTopology();
+  NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
-  private final TaskScheduler taskScheduler;
+  final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
 
@@ -1256,7 +1256,7 @@
   static final String SUBDIR = "jobTracker";
   FileSystem fs = null;
   Path systemDir = null;
-  private JobConf conf;
+  JobConf conf;
   private final UserGroupInformation mrOwner;
   private final String supergroup;
 
@@ -1790,7 +1790,7 @@
    * 
    * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
    */
-  private void removeMarkedTasks(String taskTracker) {
+  void removeMarkedTasks(String taskTracker) {
     // Purge all the 'marked' tasks which were running at taskTracker
     Set<TaskAttemptID> markedTaskSet = 
       trackerToMarkedTasksMap.get(taskTracker);
@@ -2086,7 +2086,7 @@
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTracker taskTracker) {
+  void addNewTracker(TaskTracker taskTracker) {
     TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
@@ -2183,7 +2183,7 @@
   
   // Update the listeners about the job
   // Assuming JobTracker is locked on entry.
-  private void updateJobInProgressListeners(JobChangeEvent event) {
+  void updateJobInProgressListeners(JobChangeEvent event) {
     for (JobInProgressListener listener : jobInProgressListeners) {
       listener.jobUpdated(event);
     }
@@ -2393,7 +2393,7 @@
    * @param status The new status for the task tracker
    * @return Was an old status found?
    */
-  private boolean updateTaskTrackerStatus(String trackerName,
+  boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
     TaskTracker tt = getTaskTracker(trackerName);
     TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
@@ -2501,7 +2501,7 @@
   /**
    * Process incoming heartbeat messages from the task trackers.
    */
-  private synchronized boolean processHeartbeat(
+  synchronized boolean processHeartbeat(
                                  TaskTrackerStatus trackerStatus, 
                                  boolean initialContact) {
     String trackerName = trackerStatus.getTrackerName();
@@ -2547,8 +2547,7 @@
    * A tracker wants to know if any of its Tasks have been
    * closed (because the job completed, whether successfully or not)
    */
-  private synchronized List<TaskTrackerAction> getTasksToKill(
-                                                              String taskTracker) {
+  synchronized List<TaskTrackerAction> getTasksToKill(String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
@@ -2625,7 +2624,7 @@
   /**
    * A tracker wants to know if any of its Tasks can be committed 
    */
-  private synchronized List<TaskTrackerAction> getTasksToSave(
+  synchronized List<TaskTrackerAction> getTasksToSave(
                                                  TaskTrackerStatus tts) {
     List<TaskStatus> taskStatuses = tts.getTaskReports();
     if (taskStatuses != null) {
@@ -4090,7 +4089,105 @@
   void incrementFaults(String hostName) {
     faultyTrackers.incrementFaults(hostName);
   }
-  
+
+  JobTracker(JobConf conf, Clock clock, boolean ignoredForSimulation) 
+  throws IOException {
+    this.clock = clock;
+    this.conf = conf;
+    trackerIdentifier = getDateFormat().format(new Date());
+
+    if (fs == null) {
+      fs = FileSystem.get(conf);
+    }
+    
+    tasktrackerExpiryInterval = 
+      conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
+    retiredJobsCacheSize = 
+      conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
+
+    // min time before retire
+    MAX_BLACKLISTS_PER_TRACKER = 
+        conf.getInt("mapred.max.tracker.blacklists", 4);
+    NUM_HEARTBEATS_IN_SECOND = 
+        conf.getInt("mapred.heartbeats.in.second", 100);
+    
+    try {
+      mrOwner = UnixUserGroupInformation.login(conf);
+    } catch (LoginException e) {
+      throw new IOException(StringUtils.stringifyException(e));
+    }
+    supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
+    
+    this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
+        conf.get("mapred.hosts.exclude", ""));
+    // queue manager
+    Configuration queuesConf = new Configuration(this.conf);
+    queueManager = new QueueManager(queuesConf);
+
+    // Create the scheduler
+    Class<? extends TaskScheduler> schedulerClass
+      = conf.getClass("mapred.jobtracker.taskScheduler",
+          JobQueueTaskScheduler.class, TaskScheduler.class);
+    taskScheduler = 
+      (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
+    
+    // Set ports, start RPC servers, setup security policy etc.
+    InetSocketAddress addr = getAddress(conf);
+    this.localMachine = addr.getHostName();
+    this.port = addr.getPort();
+
+    // Create the jetty server
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
+        conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
+    String infoBindAddress = infoSocAddr.getHostName();
+    int tmpInfoPort = infoSocAddr.getPort();
+    this.startTime = clock.getTime();
+    infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
+        tmpInfoPort == 0, conf);
+    infoServer.setAttribute("job.tracker", this);
+    
+    // initialize history parameters.
+    String historyLogDir = null;
+    FileSystem historyFS = null;
+
+    jobHistory = new JobHistory();
+    jobHistory.init(this, conf, this.localMachine, this.startTime);
+    jobHistory.initDone(conf, fs);
+    historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
+    infoServer.setAttribute("historyLogDir", historyLogDir);
+    historyFS = new Path(historyLogDir).getFileSystem(conf);
+
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
+    infoServer.start();
+    this.infoPort = this.infoServer.getPort();
+
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+      getInstrumentationClass(conf);
+    try {
+      java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
+      tmp = c.newInstance(this, conf);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by 
+      //falling back on the default.
+      LOG.error("failed to initialize job tracker metrics", e);
+      tmp = new JobTrackerMetricsInst(this, conf);
+    }
+    myInstrumentation = tmp;
+    
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+        NetworkTopology.DEFAULT_HOST_LEVEL);
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf);
+  }
+
   /**
    * Get the path of the locally stored job file
    * @param jobId id of the job

Modified: hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=818675&r1=818674&r2=818675&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
Fri Sep 25 00:27:57 2009
@@ -128,6 +128,13 @@
       Path emptyPath = new Path("/");
       int totalHosts = 0; // use to determine avg # of hosts per split.
       for (LoggedTask mapTask : job.getMapTasks()) {
+        Pre21JobHistoryConstants.Values taskType = mapTask.getTaskType();
+        if (taskType != Pre21JobHistoryConstants.Values.MAP) {
+          LOG.warn("TaskType for a MapTask is not Map. task="
+              + mapTask.getTaskID() + " type="
+              + ((taskType == null) ? "null" : taskType.toString()));
+          continue;
+        }
         List<LoggedLocation> locations = mapTask.getPreferredLocations();
         List<String> hostList = new ArrayList<String>();
         if (locations != null) {
@@ -235,6 +242,23 @@
   }
 
   /**
+   * Getting the number of map tasks that are actually logged in the trace.
+   * @return The number of map tasks that are actually logged in the trace.
+   */
+  public int getNumLoggedMaps() {
+    return job.getMapTasks().size();
+  }
+
+
+  /**
+   * Getting the number of reduce tasks that are actually logged in the trace.
+   * @return The number of map tasks that are actually logged in the trace.
+   */
+  public int getNumLoggedReduces() {
+    return job.getReduceTasks().size();
+  }
+  
+  /**
    * Mask the job ID part in a {@link TaskID}.
    * 
    * @param taskId
@@ -414,7 +438,6 @@
       return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
           taskNumber, locality);
     }
-
     LoggedTaskAttempt loggedAttempt = getLoggedTaskAttempt(taskType,
         taskNumber, taskAttemptNumber);
     if (loggedAttempt == null) {
@@ -578,7 +601,8 @@
     Values type = loggedTask.getTaskType();
     if ((type != Values.MAP) && (type != Values.REDUCE)) {
       throw new IllegalArgumentException(
-          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString());
+          "getTaskInfo only supports MAP or REDUCE tasks: " + type.toString() +
+          " for task = " + loggedTask.getTaskID());
     }
 
     for (LoggedTaskAttempt attempt : attempts) {

Modified: hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java?rev=818675&r1=818674&r2=818675&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Fri Sep 25 00:27:57 2009
@@ -69,7 +69,7 @@
   }
 
   @Override
-  public JobStory getNextJob() throws IOException {
+  public ZombieJob getNextJob() throws IOException {
     LoggedJob job = reader.getNext();
     return (job == null) ? null : new ZombieJob(job, cluster);
   }



Mime
View raw message