hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r886044 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/test/org/apache/hadoop/mapred/
Date Wed, 02 Dec 2009 03:45:18 GMT
Author: cdouglas
Date: Wed Dec  2 03:45:17 2009
New Revision: 886044

URL: http://svn.apache.org/viewvc?rev=886044&view=rev
Log:
MAPREDUCE-1229. Allow customization of job submission policy in Mumak.
Contributed by Hong Tang

Added:
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/LoadProbingEvent.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobSubmissionPolicy.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorSerialJobSubmission.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorStressJobSubmission.java
Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=886044&r1=886043&r2=886044&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Wed Dec  2 03:45:17 2009
@@ -433,6 +433,9 @@
     HADOOP-5107. Use Maven ant tasks to publish artifacts. (Giridharan Kesavan
     via omalley)
 
+    MAPREDUCE-1229. Allow customization of job submission policy in Mumak.
+    (Hong Tang via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-1089. Fix NPE in fair scheduler preemption when tasks are  

Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/LoadProbingEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/LoadProbingEvent.java?rev=886044&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/LoadProbingEvent.java
(added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/LoadProbingEvent.java
Wed Dec  2 03:45:17 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ * {@link LoadProbingEvent} is created by {@link SimulatorJobTracker} when the
+ * {@link SimulatorJobSubmissionPolicy} is STRESS. {@link SimulatorJobClient}
+ * picks up the event, and would check whether the system load is stressed. If
+ * not, it would submit the next job.
+ */
+public class LoadProbingEvent extends SimulatorEvent {
+  public LoadProbingEvent(SimulatorJobClient jc, long timestamp) {
+    super(jc, timestamp);
+  }
+}

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java?rev=886044&r1=886043&r2=886044&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
Wed Dec  2 03:45:17 2009
@@ -29,7 +29,6 @@
 import org.apache.hadoop.mapred.SimulatorEvent;
 import org.apache.hadoop.mapred.SimulatorEventQueue;
 import org.apache.hadoop.mapred.JobCompleteEvent;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.SimulatorJobClient;
 import org.apache.hadoop.mapred.SimulatorJobTracker;
 import org.apache.hadoop.mapred.SimulatorTaskTracker;
@@ -138,7 +137,10 @@
     JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer(
         new Path(traceFile), cluster, firstJobStartTime, jobConf);
     
-    jc = new SimulatorJobClient(jt, jobStoryProducer);
+    final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy
+        .getPolicy(jobConf);
+    
+    jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
     queue.addAll(jc.init(firstJobStartTime));
 
     // create TTs based on topology.json     

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=886044&r1=886043&r2=886044&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java
Wed Dec  2 03:45:17 2009
@@ -18,11 +18,14 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedHashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
+import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
@@ -33,14 +36,49 @@
  * Class that simulates a job client. It's main functionality is to submit jobs
  * to the simulation engine, and shutdown the simulation engine if the job
  * producer runs out of jobs.
- * TODO: Change System.out.printXX to LOG.xxx.
  */
 public class SimulatorJobClient implements SimulatorEventListener {
+  protected static class JobSketchInfo {
+    protected int numMaps;
+    protected int numReduces;
+    JobSketchInfo(int numMaps, int numReduces) {
+      this.numMaps = numMaps;
+      this.numReduces = numReduces;
+    }
+  }
+  
   private final ClientProtocol jobTracker;
   private final JobStoryProducer jobStoryProducer;
-  private Set<JobID> runningJobs = new LinkedHashSet<JobID>();
+  private final SimulatorJobSubmissionPolicy submissionPolicy;
+  private static final int LOAD_PROB_INTERVAL_START = 1000;
+  private static final int LOAD_PROB_INTERVAL_MAX = 320000;
+  private int loadProbingInterval = LOAD_PROB_INTERVAL_START;
+  
+  /**
+   * The minimum ratio between pending+running map tasks (aka. incomplete map
+   * tasks) and cluster map slot capacity for us to consider the cluster is
+   * overloaded. For running maps, we only count them partially. Namely, a 40%
+   * completed map is counted as 0.6 map tasks in our calculation.
+   */
+  private static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO=2.0f;
+  /**
+   * Keep track of the in-flight load-probing event.
+   */
+  private LoadProbingEvent inFlightLPE = null;
+  /**
+   * We do not have handle to the SimulatorEventQueue, and thus cannot cancel
+   * events directly. Instead, we keep an identity-map (should have been an
+   * identity-set except that JDK does not provide an identity-set) to skip
+   * events that are cancelled.
+   */
+  private Map<LoadProbingEvent, Boolean> cancelledLPE = 
+    new IdentityHashMap<LoadProbingEvent, Boolean>();
+  
+  private Map<JobID, JobSketchInfo> runningJobs = 
+    new LinkedHashMap<JobID, JobSketchInfo>();
   private boolean noMoreJobs = false;
-
+  private JobStory nextJob;
+  
   /**
    * Constructor.
    * 
@@ -49,64 +87,255 @@
    *          SimulatorJobClient} interacts with the JobTracker through the
    *          {@link ClientProtocol}.
    * @param jobStoryProducer
+   * @param submissionPolicy How should we submit jobs to the JobTracker?
    */
-  public SimulatorJobClient(ClientProtocol jobTracker, JobStoryProducer jobStoryProducer)
{
+  public SimulatorJobClient(ClientProtocol jobTracker, 
+                            JobStoryProducer jobStoryProducer,
+                            SimulatorJobSubmissionPolicy submissionPolicy) {
     this.jobTracker = jobTracker;
     this.jobStoryProducer = jobStoryProducer;
+    this.submissionPolicy = submissionPolicy;
   }
   
+  /**
+   * Constructor.
+   * 
+   * @param jobTracker
+   *          The job tracker where we submit job to. Note that the {@link
+   *          SimulatorJobClient} interacts with the JobTracker through the
+   *          {@link ClientProtocol}.
+   * @param jobStoryProducer
+   */
+  public SimulatorJobClient(ClientProtocol jobTracker, 
+                            JobStoryProducer jobStoryProducer) {
+    this(jobTracker, jobStoryProducer,  SimulatorJobSubmissionPolicy.REPLAY);
+  }
+
   @Override
   public List<SimulatorEvent> init(long when) throws IOException {
     JobStory job = jobStoryProducer.getNextJob();
-    if (job.getSubmissionTime() != when) {
+    if (submissionPolicy == SimulatorJobSubmissionPolicy.REPLAY
+        && job.getSubmissionTime() != when) {
       throw new IOException("Inconsistent submission time for the first job: "
           + when + " != " + job.getSubmissionTime()+".");
     }
+    
     JobSubmissionEvent event = new JobSubmissionEvent(this, when, job);
-    return Collections.<SimulatorEvent> singletonList(event);
+    if (submissionPolicy != SimulatorJobSubmissionPolicy.STRESS) {
+      return Collections.<SimulatorEvent> singletonList(event);
+    } else {
+      ArrayList<SimulatorEvent> ret = new ArrayList<SimulatorEvent>(2);
+      ret.add(event);
+      inFlightLPE = new LoadProbingEvent(this, when + loadProbingInterval);
+      ret.add(inFlightLPE);
+      return ret;
+    }
   }
-  
-  @Override
-  public List<SimulatorEvent> accept(SimulatorEvent event)
-      throws IOException {
-    if (event instanceof JobSubmissionEvent) {
-      JobSubmissionEvent submitEvent = (JobSubmissionEvent)(event);
-  
-      // Submit job
-      JobStatus status = null;
-      try {
-        status = submitJob(submitEvent.getJob());
-      } catch (InterruptedException e) {
-        throw new IOException(e);
+
+  /**
+   * Doing exponential back-off probing because load probing could be pretty
+   * expensive if we have many pending jobs.
+   * 
+   * @param overloaded Is the job tracker currently overloaded?
+   */
+  private void adjustLoadProbingInterval(boolean overloaded) {
+    if (overloaded) {
+      /**
+       * We should only extend LPE interval when there is no in-flight LPE.
+       */
+      if (inFlightLPE == null) {
+        loadProbingInterval = Math.min(loadProbingInterval * 2,
+            LOAD_PROB_INTERVAL_MAX);
       }
-      runningJobs.add(status.getJobID());
-      System.out.println("Job " + status.getJobID() + 
-                         " is submitted at " + submitEvent.getTimeStamp());
+    } else {
+      loadProbingInterval = LOAD_PROB_INTERVAL_START;
+    }
+  }
+  
+  /**
+   * We try to use some light-weight mechanism to determine cluster load.
+   * @return Whether, from job client perspective, the cluster is overloaded.
+   */
+  private boolean isOverloaded(long now) throws IOException {
+    try {
+      ClusterMetrics clusterMetrics = jobTracker.getClusterMetrics();
       
-      JobStory nextJob = jobStoryProducer.getNextJob();
-      if (nextJob == null) {
-        noMoreJobs = true;
-        return SimulatorEngine.EMPTY_EVENTS;
+      // If there are more jobs than number of task trackers, we assume the
+      // cluster is overloaded. This is to bound the memory usage of the
+      // simulator job tracker, in situations where we have jobs with small
+      // number of map tasks and large number of reduce tasks.
+      if (runningJobs.size() >= clusterMetrics.getTaskTrackerCount()) {
+        System.out.printf("%d Overloaded is %s: " +
+                "#runningJobs >= taskTrackerCount (%d >= %d)\n",
+                now, Boolean.TRUE.toString(),
+                runningJobs.size(), clusterMetrics.getTaskTrackerCount());
+        return true;    
       }
-      
-      return Collections.<SimulatorEvent>singletonList(
-          new JobSubmissionEvent(this, nextJob.getSubmissionTime(), nextJob));
+
+      float incompleteMapTasks = 0; // include pending & running map tasks.
+      for (Map.Entry<JobID, JobSketchInfo> entry : runningJobs.entrySet()) {
+        org.apache.hadoop.mapreduce.JobStatus jobStatus = jobTracker
+            .getJobStatus(entry.getKey());
+        incompleteMapTasks += (1 - Math.min(jobStatus.getMapProgress(), 1.0))
+            * entry.getValue().numMaps;
+      }
+
+      boolean overloaded = incompleteMapTasks >
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterMetrics.getMapSlotCapacity();
+      String relOp = (overloaded) ? ">" : "<=";
+      System.out.printf("%d Overloaded is %s: "
+          + "incompleteMapTasks %s %.1f*mapSlotCapacity (%.1f %s %.1f*%d)\n",
+          now, Boolean.toString(overloaded), relOp, OVERLAOD_MAPTASK_MAPSLOT_RATIO,
+          incompleteMapTasks, relOp, OVERLAOD_MAPTASK_MAPSLOT_RATIO, 
+          clusterMetrics.getMapSlotCapacity());
+      return overloaded;
+    } catch (InterruptedException e) {
+      throw new IOException("InterruptedException", e);
+    }
+  }
+  
+  /**
+   * Handles a simulation event that is either JobSubmissionEvent or 
+   * JobCompletionEvent.
+   *
+   * @param event SimulatorEvent to respond to
+   * @return list of events generated in response
+   */
+  @Override
+  public List<SimulatorEvent> accept(SimulatorEvent event) throws IOException {
+    if (event instanceof JobSubmissionEvent) {
+      return processJobSubmissionEvent((JobSubmissionEvent) event);
     } else if (event instanceof JobCompleteEvent) {
-      JobCompleteEvent jobCompleteEvent = (JobCompleteEvent)event;
-      JobStatus jobStatus = jobCompleteEvent.getJobStatus();
-      System.out.println("Job " + jobStatus.getJobID() + 
-                         " completed at " + jobCompleteEvent.getTimeStamp() + 
-                         " with status: " + jobStatus.getState() +
-                         " runtime: " + 
-                         (jobCompleteEvent.getTimeStamp() - jobStatus.getStartTime()));
-      runningJobs.remove(jobCompleteEvent.getJobStatus().getJobID());
-      if (noMoreJobs && runningJobs.isEmpty()) {
-        jobCompleteEvent.getEngine().shutdown();
+      return processJobCompleteEvent((JobCompleteEvent) event);
+    } else if (event instanceof LoadProbingEvent) {
+      return processLoadProbingEvent((LoadProbingEvent) event);
+    } else {
+      throw new IllegalArgumentException("unknown event type: "
+          + event.getClass());
+    }
+  }
+
+  /**
+   * Responds to a job submission event by submitting the job to the 
+   * job tracker. If serializeJobSubmissions is true, it postpones the
+   * submission until after the previous job finished instead.
+   * 
+   * @param submitEvent the submission event to respond to
+   */
+  private List<SimulatorEvent> processJobSubmissionEvent(
+      JobSubmissionEvent submitEvent) throws IOException {
+    // Submit job
+    JobStatus status = null;
+    JobStory story = submitEvent.getJob();
+    try {
+      status = submitJob(story);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+    runningJobs.put(status.getJobID(), new JobSketchInfo(story.getNumberMaps(),
+        story.getNumberReduces()));
+    System.out.println("Job " + status.getJobID() + " is submitted at "
+        + submitEvent.getTimeStamp());
+
+    // Find the next job to submit
+    nextJob = jobStoryProducer.getNextJob();
+    if (nextJob == null) {
+      noMoreJobs = true;
+      return SimulatorEngine.EMPTY_EVENTS;
+    } else if (submissionPolicy == SimulatorJobSubmissionPolicy.REPLAY) {
+      // enqueue next submission event
+      return Collections
+          .<SimulatorEvent> singletonList(new JobSubmissionEvent(this, nextJob
+              .getSubmissionTime(), nextJob));
+    } else if (submissionPolicy == SimulatorJobSubmissionPolicy.STRESS) {
+      return checkLoadAndSubmitJob(submitEvent.getTimeStamp());
+    }
+    
+    return SimulatorEngine.EMPTY_EVENTS;
+  }
+  
+  /**
+   * Handles a job completion event. 
+   * 
+   * @param jobCompleteEvent the submission event to respond to
+   * @throws IOException 
+   */
+  private List<SimulatorEvent> processJobCompleteEvent(
+      JobCompleteEvent jobCompleteEvent) throws IOException {
+    JobStatus jobStatus = jobCompleteEvent.getJobStatus();
+    System.out.println("Job " + jobStatus.getJobID() + " completed at "
+        + jobCompleteEvent.getTimeStamp() + " with status: "
+        + jobStatus.getState() + " runtime: "
+        + (jobCompleteEvent.getTimeStamp() - jobStatus.getStartTime()));
+    runningJobs.remove(jobCompleteEvent.getJobStatus().getJobID());
+    if (noMoreJobs && runningJobs.isEmpty()) {
+      jobCompleteEvent.getEngine().shutdown();
+    }
+
+    if (!noMoreJobs) {
+      if (submissionPolicy == SimulatorJobSubmissionPolicy.SERIAL) {
+        long submissionTime = jobCompleteEvent.getTimeStamp() + 1;
+        JobStory story = new SimulatorJobStory(nextJob, submissionTime);
+        return Collections
+            .<SimulatorEvent> singletonList(new JobSubmissionEvent(this,
+                submissionTime, story));
+      } else if (submissionPolicy == SimulatorJobSubmissionPolicy.STRESS) {
+        return checkLoadAndSubmitJob(jobCompleteEvent.getTimeStamp());
       }
+    }
+    return SimulatorEngine.EMPTY_EVENTS;
+  }
+  
+  /**
+   * Check whether job tracker is overloaded. If not, submit the next job.
+   * Pre-condition: noMoreJobs == false
+   * @return A list of {@link SimulatorEvent}'s as the follow-up actions.
+   */
+  private List<SimulatorEvent> checkLoadAndSubmitJob(long now) throws IOException {
+    List<SimulatorEvent> ret = new ArrayList<SimulatorEvent>(2);
+    boolean overloaded = isOverloaded(now);
+    adjustLoadProbingInterval(overloaded);
+    
+    if (inFlightLPE != null && (inFlightLPE.getTimeStamp()>now+loadProbingInterval))
{
+      cancelledLPE.put(inFlightLPE, Boolean.TRUE);
+      inFlightLPE = null;
+    }
+    
+    if (inFlightLPE == null) {
+      inFlightLPE = new LoadProbingEvent(this, now + loadProbingInterval);
+      ret.add(inFlightLPE);
+    }
+
+    if (!overloaded) {
+      long submissionTime = now + 1;
+      JobStory story = new SimulatorJobStory(nextJob, submissionTime);
+      ret.add(new JobSubmissionEvent(this, submissionTime, story));
+    }
+    
+    return ret;
+  }
+  
+  /**
+   * Handles a load probing event. If cluster is not overloaded, submit a new job.
+   * 
+   * @param loadProbingEvent the load probing event
+   */
+  private List<SimulatorEvent> processLoadProbingEvent(
+      LoadProbingEvent loadProbingEvent) throws IOException {
+    if (cancelledLPE.containsKey(loadProbingEvent)) {
+      cancelledLPE.remove(loadProbingEvent);
       return SimulatorEngine.EMPTY_EVENTS;
-    } else {
-      throw new IllegalArgumentException("unknown event type: " + event.getClass());
     }
+    
+    assert(loadProbingEvent == inFlightLPE);
+    
+    inFlightLPE = null;
+    
+    if (noMoreJobs) {
+      return SimulatorEngine.EMPTY_EVENTS;
+    }
+    
+    return checkLoadAndSubmitJob(loadProbingEvent.getTimeStamp());
   }
 
   @SuppressWarnings("deprecation")

Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobSubmissionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobSubmissionPolicy.java?rev=886044&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobSubmissionPolicy.java
(added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobSubmissionPolicy.java
Wed Dec  2 03:45:17 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Job submission policies. The set of policies is closed and encapsulated in
+ * {@link SimulatorJobSubmissionPolicy}. The handling of submission policies is
+ * embedded in the {@link SimulatorEngine} (through various events).
+ * 
+ */
+public enum SimulatorJobSubmissionPolicy {
+  /**
+   * replay the trace by following the job inter-arrival rate faithfully.
+   */
+  REPLAY,
+  
+  /**
+   * ignore submission time, keep submitting jobs until the cluster is saturated.
+   */
+  STRESS,
+  
+  /**
+   * submitting jobs sequentially.
+   */
+  SERIAL;
+  
+  public static final String JOB_SUBMISSION_POLICY = "mumak.job-submission.policy";
+
+  static public SimulatorJobSubmissionPolicy getPolicy(Configuration conf) {
+    String policy = conf.get(JOB_SUBMISSION_POLICY, REPLAY.name());
+    return valueOf(policy.toUpperCase());
+  }
+}

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java?rev=886044&r1=886043&r2=886044&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java
Wed Dec  2 03:45:17 2009
@@ -36,10 +36,12 @@
 public class TestSimulatorEndToEnd {
 
   public static final Log LOG = LogFactory.getLog(MockSimulatorEngine.class);
+  protected SimulatorJobSubmissionPolicy policy = SimulatorJobSubmissionPolicy.REPLAY;
   
   @Test
   public void testMain() throws Exception {
     final Configuration conf = new Configuration();
+    conf.set(SimulatorJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
     final FileSystem lfs = FileSystem.getLocal(conf);
     final Path rootInputDir = new Path(
         System.getProperty("src.test.data", "data")).makeQualified(lfs);
@@ -55,7 +57,7 @@
     MockSimulatorEngine mockMumak = new MockSimulatorEngine(numJobs, nTrackers);
 
     String[] args = { traceFile.toString(), topologyFile.toString() };
-    int res = ToolRunner.run(new Configuration(), mockMumak, args);
+    int res = ToolRunner.run(conf, mockMumak, args);
     Assert.assertEquals(res, 0);
   }
   

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java?rev=886044&r1=886043&r2=886044&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java
(original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java
Wed Dec  2 03:45:17 2009
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,6 +120,7 @@
     private long[] times;
     private int index = 0;
     private List<MockJobStory> jobs = new ArrayList<MockJobStory>();
+    private Random random = new Random();
     
     public MockJobStoryProducer(long[] times, long relativeStartTime) {
       super();
@@ -127,7 +129,7 @@
       index = 0;
       
       for (long time: times) {
-        jobs.add(new MockJobStory(time - relativeStartTime));
+        jobs.add(new MockJobStory(random, time - relativeStartTime));
       }
     }
     
@@ -149,9 +151,11 @@
   }
   
   static class MockJobStory implements JobStory {
+    private Random random;
     private long submissionTime;
     
-    public MockJobStory(long submissionTime) {
+    public MockJobStory(Random random, long submissionTime) {
+      this.random = random;
       this.submissionTime = submissionTime;
     }
     
@@ -183,12 +187,12 @@
 
     @Override
     public int getNumberMaps() {
-      throw new UnsupportedOperationException();
+      return random.nextInt(10)+1;
     }
 
     @Override
     public int getNumberReduces() {
-      throw new UnsupportedOperationException();
+      return random.nextInt(5);
     }
 
     @Override

Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorSerialJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorSerialJobSubmission.java?rev=886044&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorSerialJobSubmission.java
(added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorSerialJobSubmission.java
Wed Dec  2 03:45:17 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+public class TestSimulatorSerialJobSubmission extends TestSimulatorEndToEnd {
+  public TestSimulatorSerialJobSubmission() {
+    super();
+    policy = SimulatorJobSubmissionPolicy.SERIAL;
+  }
+}

Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorStressJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorStressJobSubmission.java?rev=886044&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorStressJobSubmission.java
(added)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorStressJobSubmission.java
Wed Dec  2 03:45:17 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+public class TestSimulatorStressJobSubmission extends TestSimulatorEndToEnd {
+  public TestSimulatorStressJobSubmission() {
+    super();
+    policy = SimulatorJobSubmissionPolicy.STRESS;
+  }
+}



Mime
View raw message