hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r919277 - in /hadoop/mapreduce/trunk: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
Date Fri, 05 Mar 2010 03:11:57 GMT
Author: cdouglas
Date: Fri Mar  5 03:11:56 2010
New Revision: 919277

URL: http://svn.apache.org/viewvc?rev=919277&view=rev
Log:
MAPREDUCE-1408. Add customizable job submission policies to Gridmix. Contributed by Rahul Singh

Added:
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Mar  5 03:11:56 2010
@@ -200,6 +200,9 @@
 
     MAPREDUCE-1454. Quote user supplied strings in Tracker servlets. (cdouglas)
 
+    MAPREDUCE-1408. Add customizable job submission policies to Gridmix. (Rahul
+    Singh via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar  5 03:11:56 2010
@@ -32,7 +32,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -82,6 +82,7 @@
   private JobFactory factory;
   private JobSubmitter submitter;
   private JobMonitor monitor;
+  private Statistics statistics;
 
   // Shutdown hook
   private final Shutdown sdh = new Shutdown();
@@ -129,20 +130,38 @@
    */
   private void startThreads(Configuration conf, String traceIn, Path ioPath,
       Path scratchDir, CountDownLatch startFlag) throws IOException {
-    monitor = createJobMonitor();
-    submitter = createJobSubmitter(monitor,
-        conf.getInt(GRIDMIX_SUB_THR,
-          Runtime.getRuntime().availableProcessors() + 1),
-        conf.getInt(GRIDMIX_QUE_DEP, 5),
-        new FilePool(conf, ioPath));
-    factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
-    monitor.start();
-    submitter.start();
-    factory.start();
-  }
+    try {
+      GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
+        conf, GridmixJobSubmissionPolicy.STRESS);
+      LOG.info(" Submission policy is " + policy.name());
+      statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
+      monitor = createJobMonitor(statistics);
+      int noOfSubmitterThreads = policy.name().equals(
+        GridmixJobSubmissionPolicy.SERIAL.name()) ? 1 :
+        Runtime.getRuntime().availableProcessors() + 1;
+
+      submitter = createJobSubmitter(
+        monitor, conf.getInt(
+          GRIDMIX_SUB_THR, noOfSubmitterThreads),
+        conf.getInt(GRIDMIX_QUE_DEP, 5), new FilePool(conf, ioPath));
+      factory = createJobFactory(
+        submitter, traceIn, scratchDir, conf, startFlag);
+      if (policy.name().equals(GridmixJobSubmissionPolicy.SERIAL.name())) {
+        statistics.addJobStatsListeners(factory);
+      } else {
+        statistics.addClusterStatsObservers(factory);
+      }
+
+      monitor.start();
+      submitter.start();
+    }catch(Exception e) {
+      LOG.error(" Exception at start " ,e);
+      throw new IOException(e);
+    }
+   }
 
-  protected JobMonitor createJobMonitor() throws IOException {
-    return new JobMonitor();
+  protected JobMonitor createJobMonitor(Statistics stats) throws IOException {
+    return new JobMonitor(stats);
   }
 
   protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
@@ -153,9 +172,11 @@
   protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
       Path scratchDir, Configuration conf, CountDownLatch startFlag)
       throws IOException {
-    return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
-        conf, startFlag);
-  }
+     return GridmixJobSubmissionPolicy.getPolicy(
+       conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
+       submitter, new ZombieJobProducer(
+         createInputStream(
+           traceIn), null), scratchDir, conf, startFlag);  }
 
   public int run(String[] argv) throws IOException, InterruptedException {
     if (argv.length < 2) {
@@ -196,6 +217,8 @@
         }
         // scan input dir contents
         submitter.refreshFilePool();
+        factory.start();
+        statistics.start();
       } catch (Throwable e) {
         LOG.error("Startup failed", e);
         if (factory != null) factory.abort(); // abort pipeline
@@ -203,7 +226,6 @@
         // signal for factory to start; sets start time
         startFlag.countDown();
       }
-
       if (factory != null) {
         // wait for input exhaustion
         factory.join(Long.MAX_VALUE);
@@ -218,6 +240,10 @@
         // wait for running tasks to complete
         monitor.shutdown();
         monitor.join(Long.MAX_VALUE);
+
+        statistics.shutdown();
+        statistics.join(Long.MAX_VALUE);
+
       }
     } finally {
       IOUtils.cleanup(LOG, trace);
@@ -256,6 +282,7 @@
         killComponent(factory, FAC_SLEEP);   // read no more tasks
         killComponent(submitter, SUB_SLEEP); // submit no more tasks
         killComponent(monitor, MON_SLEEP);   // process remaining jobs here
+        killComponent(statistics,MON_SLEEP);
       } finally {
         if (monitor == null) {
           return;

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJobSubmissionPolicy.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+
+import java.util.concurrent.CountDownLatch;
+import java.io.IOException;
+
+enum GridmixJobSubmissionPolicy {
+
+  REPLAY("REPLAY", 320000) {
+    @Override
+    public JobFactory<ClusterStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag) throws IOException {
+      return new ReplayJobFactory(
+        submitter, producer, scratchDir, conf, startFlag);
+    }
+  },
+
+  STRESS("STRESS", 5000) {
+    @Override
+    public JobFactory<ClusterStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag) throws IOException {
+      return new StressJobFactory(
+        submitter, producer, scratchDir, conf, startFlag);
+    }
+  },
+
+  SERIAL("SERIAL", 0) {
+    @Override
+    public JobFactory<JobStats> createJobFactory(
+      JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+      Configuration conf, CountDownLatch startFlag) throws IOException {
+      return new SerialJobFactory(
+        submitter, producer, scratchDir, conf, startFlag);
+    }
+
+    @Override
+    public int getPollingInterval() {
+      return 0;
+    }
+  };
+
+  public static final String JOB_SUBMISSION_POLICY =
+    "gridmix.job-submission.policy";
+
+  private final String name;
+  private final int pollingInterval;
+
+  GridmixJobSubmissionPolicy(String name, int pollingInterval) {
+    this.name = name;
+    this.pollingInterval = pollingInterval;
+  }
+
+  public abstract JobFactory createJobFactory(
+    JobSubmitter submitter, JobStoryProducer producer, Path scratchDir,
+    Configuration conf, CountDownLatch startFlag) throws IOException;
+
+  public int getPollingInterval() {
+    return pollingInterval;
+  }
+
+  public static GridmixJobSubmissionPolicy getPolicy(
+    Configuration conf, GridmixJobSubmissionPolicy defaultPolicy) {
+    String policy = conf.get(JOB_SUBMISSION_POLICY, defaultPolicy.name());
+    return valueOf(policy.toUpperCase());
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Fri Mar  5 03:11:56 2010
@@ -17,15 +17,10 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
@@ -33,13 +28,16 @@
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
-import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
 import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
@@ -49,19 +47,20 @@
  * construction.
  * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer
  */
-class JobFactory implements Gridmix.Component<Void> {
+abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T> {
 
   public static final Log LOG = LogFactory.getLog(JobFactory.class);
 
-  private final Path scratch;
-  private final float rateFactor;
-  private final Configuration conf;
-  private final ReaderThread rThread;
-  private final AtomicInteger sequence;
-  private final JobSubmitter submitter;
-  private final CountDownLatch startFlag;
-  private volatile IOException error = null;
+  protected final Path scratch;
+  protected final float rateFactor;
+  protected final Configuration conf;
+  protected final Thread rThread;
+  protected final AtomicInteger sequence;
+  protected final JobSubmitter submitter;
+  protected final CountDownLatch startFlag;
+  protected volatile IOException error = null;
   protected final JobStoryProducer jobProducer;
+  protected final ReentrantLock lock = new ReentrantLock(true);
 
   /**
    * Creating a new instance does not start the thread.
@@ -71,6 +70,7 @@
    * @param scratch Directory into which to write output from simulated jobs
    * @param conf Config passed to all jobs to be submitted
    * @param startFlag Latch released from main to start pipeline
+   * @throws java.io.IOException
    */
   public JobFactory(JobSubmitter submitter, InputStream jobTrace,
       Path scratch, Configuration conf, CountDownLatch startFlag)
@@ -96,7 +96,7 @@
     this.conf = new Configuration(conf);
     this.submitter = submitter;
     this.startFlag = startFlag;
-    this.rThread = new ReaderThread();
+    this.rThread = createReaderThread();
   }
 
   static class MinTaskInfo extends TaskInfo {
@@ -122,7 +122,7 @@
     }
   }
 
-  static class FilterJobStory implements JobStory {
+  protected static class FilterJobStory implements JobStory {
 
     protected final JobStory job;
 
@@ -154,75 +154,24 @@
     }
   }
 
-  /**
-   * Worker thread responsible for reading descriptions, assigning sequence
-   * numbers, and normalizing time.
-   */
-  private class ReaderThread extends Thread {
-
-    public ReaderThread() {
-      super("GridmixJobFactory");
-    }
+  protected abstract Thread createReaderThread() ; 
 
-    private JobStory getNextJobFiltered() throws IOException {
-      JobStory job;
-      do {
-        job = jobProducer.getNextJob();
-      } while (job != null
-          && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
-              job.getSubmissionTime() < 0));
-      return null == job ? null : new FilterJobStory(job) {
-          @Override
-          public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-            return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
-          }
-        };
-    }
-
-    @Override
-    public void run() {
-      try {
-        startFlag.await();
-        if (Thread.currentThread().isInterrupted()) {
-          return;
-        }
-        final long initTime = TimeUnit.MILLISECONDS.convert(
-            System.nanoTime(), TimeUnit.NANOSECONDS);
-        LOG.debug("START @ " + initTime);
-        long first = -1;
-        long last = -1;
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            final JobStory job = getNextJobFiltered();
-            if (null == job) {
-              return;
-            }
-            if (first < 0) {
-              first = job.getSubmissionTime();
-            }
-            final long current = job.getSubmissionTime();
-            if (current < last) {
-              LOG.warn("Job " + job.getJobID() + " out of order");
-              continue;
-            }
-            last = current;
-            submitter.add(new GridmixJob(conf, initTime +
-                  Math.round(rateFactor * (current - first)),
-                job, scratch, sequence.getAndIncrement()));
-          } catch (IOException e) {
-            JobFactory.this.error = e;
-            return;
-          }
-        }
-      } catch (InterruptedException e) {
-        // exit thread; ignore any jobs remaining in the trace
-        return;
-      } finally {
-        IOUtils.cleanup(null, jobProducer);
+  protected JobStory getNextJobFiltered() throws IOException {
+    JobStory job;
+    do {
+      job = jobProducer.getNextJob();
+    } while (job != null &&
+      (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
+        job.getSubmissionTime() < 0));
+    return null == job ? null : new FilterJobStory(job) {
+      @Override
+      public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+        return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
       }
-    }
+    };
   }
 
+
   /**
    * Obtain the error that caused the thread to exit unexpectedly.
    */

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Fri Mar  5 03:11:56 2010
@@ -47,14 +47,12 @@
   private final MonitorThread mThread;
   private final BlockingQueue<Job> runningJobs;
   private final long pollDelayMillis;
+  private Statistics statistics;
   private boolean graceful = false;
   private boolean shutdown = false;
 
-  /**
-   * Create a JobMonitor with a default polling interval of 5s.
-   */
-  public JobMonitor() {
-    this(5, TimeUnit.SECONDS);
+  public JobMonitor(Statistics statistics) {
+    this(5,TimeUnit.SECONDS, statistics);
   }
 
   /**
@@ -62,12 +60,14 @@
    * polling a still-running job.
    * @param pollDelay Delay after polling a running job
    * @param unit Time unit for pollDelaySec (rounded to milliseconds)
+   * @param statistics StatCollector , listener to job completion.
    */
-  public JobMonitor(int pollDelay, TimeUnit unit) {
+  public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) {
     mThread = new MonitorThread();
     runningJobs = new LinkedBlockingQueue<Job>();
     mJobs = new LinkedList<Job>();
     this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
+    this.statistics = statistics;
   }
 
   /**
@@ -78,6 +78,17 @@
   }
 
   /**
+   * Add a submission failed job , such tht it can be communicated
+   * back to serial.
+   * TODO: Cleaner solution for this problem
+   * @param job
+   */
+  public void submissionFailed(Job job) {
+    LOG.info(" Job submission failed notify if anyone is waiting " + job);
+    this.statistics.add(job);
+  }
+
+  /**
    * Temporary hook for recording job success.
    */
   protected void onSuccess(Job job) {
@@ -162,6 +173,7 @@
             try {
               if (job.isComplete()) {
                 process(job);
+                statistics.add(job);
                 continue;
               }
             } catch (IOException e) {

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar  5 03:11:56 2010
@@ -82,6 +82,11 @@
           job.buildSplits(inputDir);
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          monitor.submissionFailed(job.getJob());
+          return;
+        } catch (Exception e) {
+          LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          monitor.submissionFailed(job.getJob());
           return;
         }
         // Sleep until deadline
@@ -101,14 +106,22 @@
             throw new InterruptedException("Failed to submit " +
                 job.getJob().getJobName());
           }
+          monitor.submissionFailed(job.getJob());
         } catch (ClassNotFoundException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          monitor.submissionFailed(job.getJob());
         }
       } catch (InterruptedException e) {
         // abort execution, remove splits if nesc
         // TODO release ThdLoc
         GridmixJob.pullDescription(job.id());
         Thread.currentThread().interrupt();
+        monitor.submissionFailed(job.getJob());
+        return;
+      } catch(Exception e) {
+        //Due to some exception job wasnt submitted.
+        LOG.info(" Job " + job.getJob() + " submission failed " , e);
+        monitor.submissionFailed(job.getJob());
         return;
       } finally {
         sem.release();

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ReplayJobFactory.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+ class ReplayJobFactory extends JobFactory<Statistics.ClusterStats> {
+  public static final Log LOG = LogFactory.getLog(ReplayJobFactory.class);
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Job story producer
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @throws java.io.IOException
+   */
+  public ReplayJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag)
+    throws IOException {
+    super(submitter, jobProducer, scratch, conf, startFlag);
+  }
+
+   
+    @Override
+  public Thread createReaderThread() {
+    return new ReplayReaderThread("ReplayJobFactory");
+  }
+
+   /**
+    * @param item
+    */
+   public void update(Statistics.ClusterStats item) {
+   }
+
+   private class ReplayReaderThread extends Thread {
+
+    public ReplayReaderThread(String threadName) {
+      super(threadName);
+    }
+
+
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        final long initTime = TimeUnit.MILLISECONDS.convert(
+          System.nanoTime(), TimeUnit.NANOSECONDS);
+        LOG.info("START REPLAY @ " + initTime);
+        long first = -1;
+        long last = -1;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            final JobStory job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (first < 0) {
+              first = job.getSubmissionTime();
+            }
+            final long current = job.getSubmissionTime();
+            if (current < last) {
+              LOG.warn("Job " + job.getJobID() + " out of order");
+              continue;
+            }
+            last = current;
+            submitter.add(
+              new GridmixJob(
+                conf, initTime + Math.round(rateFactor * (current - first)),
+                job, scratch,sequence.getAndIncrement()));
+          } catch (IOException e) {
+            error = e;
+            return;
+          }
+        }
+      } catch (InterruptedException e) {
+        // exit thread; ignore any jobs remaining in the trace
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+   /**
+    * Start the reader thread, wait for latch if necessary.
+    */
+   @Override
+   public void start() {
+     this.rThread.start();
+   }
+
+}
\ No newline at end of file

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SerialJobFactory.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+public class SerialJobFactory extends JobFactory<JobStats> {
+
+  public static final Log LOG = LogFactory.getLog(SerialJobFactory.class);
+  private final Condition jobCompleted = lock.newCondition();
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Job story producer
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @throws java.io.IOException
+   */
+  public SerialJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag)
+    throws IOException {
+    super(submitter, jobProducer, scratch, conf, startFlag);
+  }
+
+  @Override
+  public Thread createReaderThread() {
+    return new SerialReaderThread("SerialJobFactory");
+  }
+
+  private class SerialReaderThread extends Thread {
+
+    public SerialReaderThread(String threadName) {
+      super(threadName);
+    }
+
+    /**
+     * SERIAL : In this scenario .  method waits on notification ,
+     * that a submitted job is actually completed. Logic is simple.
+     * ===
+     * while(true) {
+     * wait till previousjob is completed.
+     * break;
+     * }
+     * submit newJob.
+     * previousJob = newJob;
+     * ==
+     */
+    @Override
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        LOG.info("START SERIAL @ " + System.currentTimeMillis());
+        GridmixJob prevJob;
+        while (!Thread.currentThread().isInterrupted()) {
+          final JobStory job;
+          try {
+            job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                "Serial mode submitting job " + job.getName());
+            }
+            prevJob = new GridmixJob(
+              conf, 0L, job, scratch,sequence.getAndIncrement());
+
+            lock.lock();
+            try {
+              LOG.info(" Submitted the job " + prevJob);
+              submitter.add(prevJob);
+            } finally {
+              lock.unlock();
+            }
+          } catch (IOException e) {
+            error = e;
+            //If submission of current job fails , try to submit the next job.
+            return;
+          }
+
+          if (prevJob != null) {
+            //Wait till previous job submitted is completed.
+            lock.lock();
+            try {
+              while (true) {
+                try {
+                  jobCompleted.await();
+                } catch (InterruptedException ie) {
+                  LOG.error(
+                    " Error in SerialJobFactory while waiting for job completion ",
+                    ie);
+                  return;
+                }
+                if (LOG.isDebugEnabled()) {
+                  LOG.info(" job " + job.getName() + " completed ");
+                }
+                break;
+              }
+            } finally {
+              lock.unlock();
+            }
+            prevJob = null;
+          }
+        }
+      } catch (InterruptedException e) {
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+
+  }
+
+  /**
+   * SERIAL. Once you get notification from StatsCollector about the job
+   * completion ,simply notify the waiting thread.
+   *
+   * @param item
+   */
+  @Override
+  public void update(Statistics.JobStats item) {
+    //simply notify in case of serial submissions. We are just bothered
+    //if submitted job is completed or not.
+    lock.lock();
+    try {
+      jobCompleted.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  @Override
+  public void start() {
+    LOG.info(" Starting Serial submission ");
+    this.rThread.start();
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StatListener.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+/**
+ * Stat listener.
+ * @param <T>
+ */
+interface StatListener<T>{
+
+  /**
+   * 
+   * @param item
+   */
+  void update(T item);
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Component collecting the stats required by other components
+ * to make decisions.
+ * Single thread Collector tries to collec the stats.
+ * Each of thread poll updates certain datastructure(Currently ClusterStats).
+ * Components interested in these datastructure, need to register.
+ * StatsCollector notifies each of the listeners.
+ */
+public class Statistics implements Component<Job> {
+  public static final Log LOG = LogFactory.getLog(Statistics.class);
+
+  private final StatCollector statistics = new StatCollector();
+  private final Cluster cluster;
+
+  //List of cluster status listeners.
+  private final List<StatListener<ClusterStats>> clusterStatlisteners =
+    new ArrayList<StatListener<ClusterStats>>();
+
+  //List of job status listeners.
+  private final List<StatListener<JobStats>> jobStatListeners =
+    new ArrayList<StatListener<JobStats>>();
+
+  private int completedJobsInCurrentInterval = 0;
+  private final int jtPollingInterval;
+  private volatile boolean shutdown = false;
+  private final int maxJobCompletedInInterval;
+  private static final String MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY =
+    "gridmix.max-jobs-completed-in-poll-interval";
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition jobCompleted = lock.newCondition();
+  private final CountDownLatch startFlag;
+
+  public Statistics(
+    Configuration conf, int pollingInterval, CountDownLatch startFlag)
+    throws IOException {
+    this.cluster = new Cluster(conf);
+    this.jtPollingInterval = pollingInterval;
+    maxJobCompletedInInterval = conf.getInt(
+      MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
+    this.startFlag = startFlag;
+  }
+
+  /**
+   * Used by JobMonitor to add the completed job.
+   */
+  @Override
+  public void add(Job job) {
+    //This thread will be notified initially by jobmonitor incase of
+    //data generation. Ignore that as we are getting once the input is
+    //generated.
+    if (!statistics.isAlive()) {
+      return;
+    }
+    completedJobsInCurrentInterval++;
+    //check if we have reached the maximum level of job completions.
+    if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          " Reached maximum limit of jobs in a polling interval " +
+            completedJobsInCurrentInterval);
+      }
+      completedJobsInCurrentInterval = 0;
+      lock.lock();
+      try {
+        //Job is completed notify all the listeners.
+        if (jobStatListeners.size() > 0) {
+          for (StatListener<JobStats> l : jobStatListeners) {
+            JobStats stats = new JobStats();
+            stats.setCompleteJob(job);
+            l.update(stats);
+          }
+        }
+        this.jobCompleted.signalAll();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  //TODO: We have just 2 types of listeners as of now . If no of listeners
+  //increase then we should move to map kind of model.
+
+  public void addClusterStatsObservers(StatListener<ClusterStats> listener) {
+    clusterStatlisteners.add(listener);
+  }
+
+  public void addJobStatsListeners(StatListener<JobStats> listener) {
+    this.jobStatListeners.add(listener);
+  }
+
+  /**
+   * Attempt to start the service.
+   */
+  @Override
+  public void start() {
+    statistics.start();
+  }
+
+  private class StatCollector extends Thread {
+
+    StatCollector() {
+      super("StatsCollectorThread");
+    }
+
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+      } catch (InterruptedException ie) {
+        LOG.error(
+          "Statistics Error while waiting for other threads to get ready ", ie);
+        return;
+      }
+      while (!shutdown) {
+        lock.lock();
+        try {
+          jobCompleted.await(jtPollingInterval, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+          LOG.error(
+            "Statistics interrupt while waiting for polling " + ie.getCause(),
+            ie);
+          return;
+        } finally {
+          lock.unlock();
+        }
+
+        //Fetch cluster data only if required.i.e .
+        // only if there are clusterStats listener.
+        if (clusterStatlisteners.size() > 0) {
+          try {
+            ClusterMetrics clusterStatus = cluster.getClusterStatus();
+            Job[] allJobs = cluster.getAllJobs();
+            List<Job> runningWaitingJobs = getRunningWaitingJobs(allJobs);
+            updateAndNotifyClusterStatsListeners(
+              clusterStatus, runningWaitingJobs);
+          } catch (IOException e) {
+            LOG.error(
+              "Statistics io exception while polling JT ", e);
+            return;
+          } catch (InterruptedException e) {
+            LOG.error(
+              "Statistics interrupt exception while polling JT ", e);
+            return;
+          }
+        }
+      }
+    }
+
+    private void updateAndNotifyClusterStatsListeners(
+      ClusterMetrics clusterMetrics, List<Job> runningWaitingJobs) {
+      ClusterStats stats = ClusterStats.getClusterStats();
+      stats.setClusterMetric(clusterMetrics);
+      stats.setRunningWaitingJobs(runningWaitingJobs);
+      for (StatListener<ClusterStats> listener : clusterStatlisteners) {
+        listener.update(stats);
+      }
+    }
+
+
+    /**
+     * From the list of Jobs , give the list of jobs whoes state is eigther
+     * PREP or RUNNING.
+     *
+     * @param allJobs
+     * @return
+     * @throws java.io.IOException
+     * @throws InterruptedException
+     */
+    private List<Job> getRunningWaitingJobs(Job[] allJobs)
+      throws IOException, InterruptedException {
+      List<Job> result = new ArrayList<Job>();
+      for (Job job : allJobs) {
+        //TODO Check if job.getStatus() makes a rpc call
+        org.apache.hadoop.mapreduce.JobStatus.State state =
+          job.getStatus().getState();
+        if (org.apache.hadoop.mapreduce.JobStatus.State.PREP.equals(state) ||
+          org.apache.hadoop.mapreduce.JobStatus.State.RUNNING.equals(state)) {
+          result.add(job);
+        }
+      }
+      return result;
+    }
+  }
+
+  /**
+   * Wait until the service completes. It is assumed that either a
+   * {@link #shutdown} or {@link #abort} has been requested.
+   */
+  @Override
+  public void join(long millis) throws InterruptedException {
+    statistics.join(millis);
+  }
+
+  @Override
+  public void shutdown() {
+    shutdown = true;
+    clusterStatlisteners.clear();
+    jobStatListeners.clear();
+    statistics.interrupt();
+  }
+
+  @Override
+  public void abort() {
+    shutdown = true;
+    clusterStatlisteners.clear();
+    jobStatListeners.clear();
+    statistics.interrupt();
+  }
+
+  /**
+   * Class to encapsulate the JobStats information.
+   * Current we just need information about completedJob.
+   * TODO: In future we need to extend this to send more information.
+   */
+  static class JobStats {
+    private Job completedJob;
+
+    public Job getCompleteJob() {
+      return completedJob;
+    }
+
+    public void setCompleteJob(Job job) {
+      this.completedJob = job;
+    }
+  }
+
+  static class ClusterStats {
+    private ClusterMetrics status = null;
+    private static ClusterStats stats = new ClusterStats();
+    private List<Job> runningWaitingJobs;
+
+    private ClusterStats() {
+
+    }
+
+    /**
+     * @return stats
+     */
+    static ClusterStats getClusterStats() {
+      return stats;
+    }
+
+    /**
+     * @param metrics
+     */
+    void setClusterMetric(ClusterMetrics metrics) {
+      this.status = metrics;
+    }
+
+    /**
+     * @return metrics
+     */
+    public ClusterMetrics getStatus() {
+      return status;
+    }
+
+    /**
+     * @return runningWatitingJobs
+     */
+    public List<Job> getRunningWaitingJobs() {
+      return runningWaitingJobs;
+    }
+
+    public void setRunningWaitingJobs(List<Job> runningWaitingJobs) {
+      this.runningWaitingJobs = runningWaitingJobs;
+    }
+
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+
+public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
+  public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
+
+  private LoadStatus loadStatus = new LoadStatus();
+  private List<Job> runningWaitingJobs;
+  private final Condition overloaded = this.lock.newCondition();
+  /**
+   * The minimum ratio between pending+running map tasks (aka. incomplete map
+   * tasks) and cluster map slot capacity for us to consider the cluster is
+   * overloaded. For running maps, we only count them partially. Namely, a 40%
+   * completed map is counted as 0.6 map tasks in our calculation.
+   */
+  static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+
+  /**
+   * Creating a new instance does not start the thread.
+   *
+   * @param submitter   Component to which deserialized jobs are passed
+   * @param jobProducer Stream of job traces with which to construct a
+   *                    {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch     Directory into which to write output from simulated jobs
+   * @param conf        Config passed to all jobs to be submitted
+   * @param startFlag   Latch released from main to start pipeline
+   * @throws java.io.IOException
+   */
+  public StressJobFactory(
+    JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
+    Configuration conf, CountDownLatch startFlag) throws IOException {
+    super(
+      submitter, jobProducer, scratch, conf, startFlag);
+
+    //Setting isOverloaded as true , now JF would wait for atleast first
+    //set of ClusterStats based on which it can decide how many job it has
+    //to submit.
+    this.loadStatus.isOverloaded = true;
+  }
+
+  public Thread createReaderThread() {
+    return new StressReaderThread("StressJobFactory");
+  }
+
+  /*
+  * Worker thread responsible for reading descriptions, assigning sequence
+  * numbers, and normalizing time.
+  */
+  private class StressReaderThread extends Thread {
+
+    public StressReaderThread(String name) {
+      super(name);
+    }
+
+    /**
+     * STRESS: Submits the job in STRESS mode.
+     * while(JT is overloaded) {
+     * wait();
+     * }
+     * If not overloaded , get number of slots available.
+     * Keep submitting the jobs till ,total jobs  is sufficient to
+     * load the JT.
+     * That is submit  (Sigma(no of maps/Job)) > (2 * no of slots available)
+     */
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        LOG.info("START STRESS @ " + System.currentTimeMillis());
+        while (!Thread.currentThread().isInterrupted()) {
+          lock.lock();
+          try {
+            while (loadStatus.isOverloaded) {
+              //Wait while JT is overloaded.
+              try {
+                overloaded.await();
+              } catch (InterruptedException ie) {
+                return;
+              }
+            }
+
+            int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
+            LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
+
+            for (int i = 0; i < noOfSlotsAvailable; i++) {
+              try {
+                final JobStory job = getNextJobFiltered();
+                if (null == job) {
+                  return;
+                }
+                //TODO: We need to take care of scenario when one map takes more
+                //than 1 slot.
+                i += job.getNumberMaps();
+
+                submitter.add(
+                  new GridmixJob(
+                    conf, 0L, job, scratch, sequence.getAndIncrement()));
+              } catch (IOException e) {
+                LOG.error(" EXCEPTOIN in availableSlots ", e);
+                error = e;
+                return;
+              }
+
+            }
+          } finally {
+            lock.unlock();
+          }
+        }
+      } catch (InterruptedException e) {
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+  /**
+   * <p/>
+   * STRESS Once you get the notification from StatsCollector.Collect the
+   * clustermetrics. Update current loadStatus with new load status of JT.
+   *
+   * @param item
+   */
+  @Override
+  public void update(Statistics.ClusterStats item) {
+    lock.lock();
+    try {
+      ClusterMetrics clusterMetrics = item.getStatus();
+      LoadStatus newStatus;
+      runningWaitingJobs = item.getRunningWaitingJobs();
+      newStatus = checkLoadAndGetSlotsToBackfill(clusterMetrics);
+      loadStatus.isOverloaded = newStatus.isOverloaded;
+      loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
+      overloaded.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We try to use some light-weight mechanism to determine cluster load.
+   *
+   * @param clusterStatus
+   * @return Whether, from job client perspective, the cluster is overloaded.
+   */
+  private LoadStatus checkLoadAndGetSlotsToBackfill(
+    ClusterMetrics clusterStatus) {
+    LoadStatus loadStatus = new LoadStatus();
+    // If there are more jobs than number of task trackers, we assume the
+    // cluster is overloaded. This is to bound the memory usage of the
+    // simulator job tracker, in situations where we have jobs with small
+    // number of map tasks and large number of reduce tasks.
+    if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackerCount()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          System.currentTimeMillis() + " Overloaded is " +
+            Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
+            runningWaitingJobs.size() + " >= " +
+            clusterStatus.getTaskTrackerCount() + " )\n");
+      }
+      loadStatus.isOverloaded = true;
+      loadStatus.numSlotsBackfill = 0;
+      return loadStatus;
+    }
+
+    float incompleteMapTasks = 0; // include pending & running map tasks.
+    for (Job job : runningWaitingJobs) {
+      try{
+      incompleteMapTasks += (1 - Math.min(
+        job.getStatus().getMapProgress(), 1.0)) *
+        ((JobConf) job.getConfiguration()).getNumMapTasks();
+      }catch(IOException io) {
+        //There might be issues with this current job
+        //try others
+        LOG.error(" Error while calculating load " , io);
+        continue;
+      }catch(InterruptedException ie) {
+        //There might be issues with this current job
+        //try others        
+        LOG.error(" Error while calculating load " , ie);
+        continue;
+      }
+    }
+
+    float overloadedThreshold =
+      OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMapSlotCapacity();
+    boolean overloaded = incompleteMapTasks > overloadedThreshold;
+    String relOp = (overloaded) ? ">" : "<=";
+    if (LOG.isDebugEnabled()) {
+      LOG.info(
+        System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
+          overloaded) + " incompleteMapTasks " + relOp + " " +
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
+          incompleteMapTasks + " " + relOp + " " +
+          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
+          clusterStatus.getMapSlotCapacity() + ")");
+    }
+    if (overloaded) {
+      loadStatus.isOverloaded = true;
+      loadStatus.numSlotsBackfill = 0;
+    } else {
+      loadStatus.isOverloaded = false;
+      loadStatus.numSlotsBackfill =
+        (int) (overloadedThreshold - incompleteMapTasks);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Current load Status is " + loadStatus);
+    }
+    return loadStatus;
+  }
+
+  static class LoadStatus {
+    volatile boolean isOverloaded = false;
+    volatile int numSlotsBackfill = -1;
+
+    public String toString() {
+      return " is Overloaded " + isOverloaded + " no of slots available " +
+        numSlotsBackfill;
+    }
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  @Override
+  public void start() {
+    LOG.info(" Starting Stress submission ");
+    this.rThread.start();
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Fri Mar  5 03:11:56 2010
@@ -17,261 +17,90 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.tools.rumen.JobStoryProducer;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
-import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
 
 
 /**
  * Component generating random job traces for testing on a single node.
  */
-class DebugJobFactory extends JobFactory {
-
-  public DebugJobFactory(JobSubmitter submitter, Path scratch, int numJobs,
-      Configuration conf, CountDownLatch startFlag) throws IOException {
-    super(submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
-        startFlag);
-  }
+class DebugJobFactory {
 
-  ArrayList<JobStory> getSubmitted() {
-    return ((DebugJobProducer)jobProducer).submitted;
+  interface Debuggable {
+    ArrayList<JobStory> getSubmitted();
   }
 
-  private static class DebugJobProducer implements JobStoryProducer {
-    final ArrayList<JobStory> submitted;
-    private final Configuration conf;
-    private final AtomicInteger numJobs;
-
-    public DebugJobProducer(int numJobs, Configuration conf) {
-      super();
-      this.conf = conf;
-      this.numJobs = new AtomicInteger(numJobs);
-      this.submitted = new ArrayList<JobStory>();
-    }
+  public static JobFactory getFactory(
+    JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+    CountDownLatch startFlag) throws IOException {
+    GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
+      conf, GridmixJobSubmissionPolicy.STRESS);
+    if (policy.name().equalsIgnoreCase("REPLAY")) {
+      return new DebugReplayJobFactory(
+        submitter, scratch, numJobs, conf, startFlag);
+    } else if (policy.name().equalsIgnoreCase("STRESS")) {
+      return new DebugStressJobFactory(
+        submitter, scratch, numJobs, conf, startFlag);
+    } else if (policy.name().equalsIgnoreCase("SERIAL")) {
+      return new DebugSerialJobFactory(
+        submitter, scratch, numJobs, conf, startFlag);
 
-    @Override
-    public JobStory getNextJob() throws IOException {
-      if (numJobs.getAndDecrement() > 0) {
-        final MockJob ret = new MockJob(conf);
-        submitted.add(ret);
-        return ret;
-      }
-      return null;
     }
-
-    @Override
-    public void close() { }
-  }
-
-  static double[] getDistr(Random r, double mindist, int size) {
-    assert 0.0 <= mindist && mindist <= 1.0;
-    final double min = mindist / size;
-    final double rem = 1.0 - min * size;
-    final double[] tmp = new double[size];
-    for (int i = 0; i < tmp.length - 1; ++i) {
-      tmp[i] = r.nextDouble() * rem;
-    }
-    tmp[tmp.length - 1] = rem;
-    Arrays.sort(tmp);
-
-    final double[] ret = new double[size];
-    ret[0] = tmp[0] + min;
-    for (int i = 1; i < size; ++i) {
-      ret[i] = tmp[i] - tmp[i-1] + min;
-    }
-    return ret;
+    return null;
   }
 
-  /**
-   * Generate random task data for a synthetic job.
-   */
-  static class MockJob implements JobStory {
-
-    static final int MIN_REC = 1 << 14;
-    static final int MIN_BYTES = 1 << 20;
-    static final int VAR_REC = 1 << 14;
-    static final int VAR_BYTES = 4 << 20;
-    static final int MAX_MAP = 5;
-    static final int MAX_RED = 3;
-
-    static void initDist(Random r, double min, int[] recs, long[] bytes,
-        long tot_recs, long tot_bytes) {
-      final double[] recs_dist = getDistr(r, min, recs.length);
-      final double[] bytes_dist = getDistr(r, min, recs.length);
-      long totalbytes = 0L;
-      int totalrecs = 0;
-      for (int i = 0; i < recs.length; ++i) {
-        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
-        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
-        totalrecs += recs[i];
-        totalbytes += bytes[i];
-      }
-      // Add/remove excess
-      recs[0] += totalrecs - tot_recs;
-      bytes[0] += totalbytes - tot_bytes;
-      if (LOG.isInfoEnabled()) {
-        LOG.info("DIST: " + Arrays.toString(recs) + " " +
-            tot_recs + "/" + totalrecs + " " +
-            Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
-      }
-    }
-
-    private static final AtomicInteger seq = new AtomicInteger(0);
-    // set timestamps in the past
-    private static final AtomicLong timestamp =
-      new AtomicLong(System.currentTimeMillis() -
-        TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
-
-    private final int id;
-    private final String name;
-    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
-    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
-    private final long submitTime;
-
-    public MockJob(Configuration conf) {
-      final Random r = new Random();
-      final long seed = r.nextLong();
-      r.setSeed(seed);
-      id = seq.getAndIncrement();
-      name = String.format("MOCKJOB%05d", id);
-      LOG.info(name + " (" + seed + ")");
-      submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
-            r.nextInt(10), TimeUnit.SECONDS));
-
-      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
-      m_bytesIn = new long[m_recsIn.length];
-      m_recsOut = new int[m_recsIn.length];
-      m_bytesOut = new long[m_recsIn.length];
-
-      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
-      r_bytesIn = new long[r_recsIn.length];
-      r_recsOut = new int[r_recsIn.length];
-      r_bytesOut = new long[r_recsIn.length];
-
-      // map input
-      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
-
-      // shuffle
-      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
-      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
-
-      // reduce output
-      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
-      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
-      initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
-
-      if (LOG.isDebugEnabled()) {
-        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
-        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
-        for (int i = 0; i < m_recsIn.length; ++i) {
-          iMapRTotal += m_recsIn[i];
-          iMapBTotal += m_bytesIn[i];
-          oMapRTotal += m_recsOut[i];
-          oMapBTotal += m_bytesOut[i];
-        }
-        for (int i = 0; i < r_recsIn.length; ++i) {
-          iRedRTotal += r_recsIn[i];
-          iRedBTotal += r_bytesIn[i];
-          oRedRTotal += r_recsOut[i];
-          oRedBTotal += r_bytesOut[i];
-        }
-        LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
-                                   " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
-            m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
-            r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
-            submitTime));
-      }
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public String getUser() {
-      return "FOOBAR";
-    }
-
-    @Override
-    public JobID getJobID() {
-      return new JobID("job_mock_" + name, id);
-    }
-
-    @Override
-    public Values getOutcome() {
-      return Values.SUCCESS;
-    }
-
-    @Override
-    public long getSubmissionTime() {
-      return submitTime;
-    }
-
-    @Override
-    public int getNumberMaps() {
-      return m_bytesIn.length;
+  static class DebugReplayJobFactory extends ReplayJobFactory
+    implements Debuggable {
+    public DebugReplayJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag);
     }
 
     @Override
-    public int getNumberReduces() {
-      return r_bytesIn.length;
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
 
-    @Override
-    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-      switch (taskType) {
-        case MAP:
-          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
-              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
-        case REDUCE:
-          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
-              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
-        default:
-          throw new IllegalArgumentException("Not interested");
-      }
-    }
+  }
 
-    @Override
-    public InputSplit[] getInputSplits() {
-      throw new UnsupportedOperationException();
+  static class DebugSerialJobFactory extends SerialJobFactory
+    implements Debuggable {
+    public DebugSerialJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag);
     }
 
     @Override
-    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType,
-        int taskNumber, int taskAttemptNumber) {
-      throw new UnsupportedOperationException();
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
+  }
 
-    @Override
-    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
-        int taskAttemptNumber, int locality) {
-      throw new UnsupportedOperationException();
+  static class DebugStressJobFactory extends StressJobFactory
+    implements Debuggable {
+    public DebugStressJobFactory(
+      JobSubmitter submitter, Path scratch, int numJobs, Configuration conf,
+      CountDownLatch startFlag) throws IOException {
+      super(
+        submitter, new DebugJobProducer(numJobs, conf), scratch, conf,
+        startFlag);
     }
 
     @Override
-    public org.apache.hadoop.mapred.JobConf getJobConf() {
-      throw new UnsupportedOperationException();
+    public ArrayList<JobStory> getSubmitted() {
+      return ((DebugJobProducer) jobProducer).submitted;
     }
   }
+
 }

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=919277&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Fri Mar  5 03:11:56 2010
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class DebugJobProducer implements JobStoryProducer {
+  public static final Log LOG = LogFactory.getLog(DebugJobProducer.class);
+  final ArrayList<JobStory> submitted;
+  private final Configuration conf;
+  private final AtomicInteger numJobs;
+
+  public DebugJobProducer(int numJobs, Configuration conf) {
+    super();
+    MockJob.reset();
+    this.conf = conf;
+    this.numJobs = new AtomicInteger(numJobs);
+    this.submitted = new ArrayList<JobStory>();
+  }
+
+  @Override
+  public JobStory getNextJob() throws IOException {
+    if (numJobs.getAndDecrement() > 0) {
+      final MockJob ret = new MockJob(conf);
+      submitted.add(ret);
+      return ret;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+  }
+
+
+  static double[] getDistr(Random r, double mindist, int size) {
+    assert 0.0 <= mindist && mindist <= 1.0;
+    final double min = mindist / size;
+    final double rem = 1.0 - min * size;
+    final double[] tmp = new double[size];
+    for (int i = 0; i < tmp.length - 1; ++i) {
+      tmp[i] = r.nextDouble() * rem;
+    }
+    tmp[tmp.length - 1] = rem;
+    Arrays.sort(tmp);
+
+    final double[] ret = new double[size];
+    ret[0] = tmp[0] + min;
+    for (int i = 1; i < size; ++i) {
+      ret[i] = tmp[i] - tmp[i - 1] + min;
+    }
+    return ret;
+  }
+
+
+  /**
+   * Generate random task data for a synthetic job.
+   */
+  static class MockJob implements JobStory {
+
+    static final int MIN_REC = 1 << 14;
+    static final int MIN_BYTES = 1 << 20;
+    static final int VAR_REC = 1 << 14;
+    static final int VAR_BYTES = 4 << 20;
+    static final int MAX_MAP = 5;
+    static final int MAX_RED = 3;
+
+    static void initDist(
+      Random r, double min, int[] recs, long[] bytes, long tot_recs,
+      long tot_bytes) {
+      final double[] recs_dist = getDistr(r, min, recs.length);
+      final double[] bytes_dist = getDistr(r, min, recs.length);
+      long totalbytes = 0L;
+      int totalrecs = 0;
+      for (int i = 0; i < recs.length; ++i) {
+        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+        totalrecs += recs[i];
+        totalbytes += bytes[i];
+      }
+      // Add/remove excess
+      recs[0] += totalrecs - tot_recs;
+      bytes[0] += totalbytes - tot_bytes;
+      if (LOG.isInfoEnabled()) {
+        LOG.info(
+          "DIST: " + Arrays.toString(recs) + " " + tot_recs + "/" + totalrecs +
+            " " + Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+      }
+    }
+
+    private static final AtomicInteger seq = new AtomicInteger(0);
+    // set timestamp in the past
+    private static final AtomicLong timestamp = new AtomicLong(
+      System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+        60, TimeUnit.DAYS));
+
+    private final int id;
+    private final String name;
+    private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
+    private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
+    private final long submitTime;
+
+    public MockJob(Configuration conf) {
+      final Random r = new Random();
+      final long seed = r.nextLong();
+      r.setSeed(seed);
+      id = seq.getAndIncrement();
+      name = String.format("MOCKJOB%05d", id);
+
+      LOG.info(name + " (" + seed + ")");
+      submitTime = timestamp.addAndGet(
+        TimeUnit.MILLISECONDS.convert(
+          r.nextInt(10), TimeUnit.SECONDS));
+
+      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+      m_bytesIn = new long[m_recsIn.length];
+      m_recsOut = new int[m_recsIn.length];
+      m_bytesOut = new long[m_recsIn.length];
+
+      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+      r_bytesIn = new long[r_recsIn.length];
+      r_recsOut = new int[r_recsIn.length];
+      r_bytesOut = new long[r_recsIn.length];
+
+      // map input
+      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+      // shuffle
+      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+      // reduce output
+      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, r_recsOut, r_bytesOut, red_recs, red_bytes);
+
+      if (LOG.isDebugEnabled()) {
+        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+        for (int i = 0; i < m_recsIn.length; ++i) {
+          iMapRTotal += m_recsIn[i];
+          iMapBTotal += m_bytesIn[i];
+          oMapRTotal += m_recsOut[i];
+          oMapBTotal += m_bytesOut[i];
+        }
+        for (int i = 0; i < r_recsIn.length; ++i) {
+          iRedRTotal += r_recsIn[i];
+          iRedBTotal += r_bytesIn[i];
+          oRedRTotal += r_recsOut[i];
+          oRedBTotal += r_bytesOut[i];
+        }
+        LOG.debug(
+          String.format(
+            "%s: M (%03d) %6d/%10d -> %6d/%10d" +
+              " R (%03d) %6d/%10d -> %6d/%10d @%d", name, m_bytesIn.length,
+            iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal, r_bytesIn.length,
+            iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal, submitTime));
+      }
+    }
+    @Override
+   public String getName() {
+     return name;
+    }
+
+   @Override
+   public String getUser() {
+     return "FOOBAR";
+   }
+
+   @Override
+   public JobID getJobID() {
+     return new JobID("job_mock_" + name, id);
+    }
+
+    @Override
+   public Values getOutcome() {
+     return Values.SUCCESS;
+    }
+
+   @Override
+   public long getSubmissionTime() {
+     return submitTime;
+   }
+
+   @Override
+   public int getNumberMaps() {
+     return m_bytesIn.length;
+   }
+
+   @Override
+   public int getNumberReduces() {
+     return r_bytesIn.length;
+   }
+    
+    @Override
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      switch (taskType) {
+        case MAP:
+          return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
+              m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
+        case REDUCE:
+          return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
+              r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
+        default:
+          throw new IllegalArgumentException("Not interested");
+      }
+    }
+
+    @Override
+    public InputSplit[] getInputSplits() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getTaskAttemptInfo(
+      TaskType taskType, int taskNumber, int taskAttemptNumber) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+      int taskNumber, int taskAttemptNumber, int locality) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public org.apache.hadoop.mapred.JobConf getJobConf() {
+      throw new UnsupportedOperationException();
+    }
+
+    public static void reset() {
+      seq.set(0);
+      timestamp.set(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(
+        60, TimeUnit.DAYS));
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=919277&r1=919276&r2=919277&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  5 03:11:56 2010
@@ -54,6 +54,8 @@
 import org.apache.log4j.Level;
 
 public class TestGridmixSubmission {
+  static GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.REPLAY;
+
   {
     ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.mapred.gridmix")
         ).getLogger().setLevel(Level.DEBUG);
@@ -96,8 +98,8 @@
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
-    public TestMonitor(int expected) {
-      super();
+    public TestMonitor(int expected, Statistics stats) {
+      super(stats);
       this.expected = expected;
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
@@ -276,16 +278,18 @@
 
   static class DebugGridmix extends Gridmix {
 
-    private DebugJobFactory factory;
+    private JobFactory factory;
     private TestMonitor monitor;
 
     public void checkMonitor() throws Exception {
-      monitor.verify(factory.getSubmitted());
+      monitor.verify(((DebugJobFactory.Debuggable)factory).getSubmitted());
     }
 
     @Override
-    protected JobMonitor createJobMonitor() {
-      monitor = new TestMonitor(NJOBS + 1); // include data generation job
+    protected JobMonitor createJobMonitor(Statistics stats) {
+      Configuration conf = new Configuration();
+      conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy.name());
+      monitor = new TestMonitor(NJOBS + 1, stats);
       return monitor;
     }
 
@@ -293,16 +297,42 @@
     protected JobFactory createJobFactory(JobSubmitter submitter,
         String traceIn, Path scratchDir, Configuration conf,
         CountDownLatch startFlag) throws IOException {
-      factory =
-        new DebugJobFactory(submitter, scratchDir, NJOBS, conf, startFlag);
+      factory = DebugJobFactory.getFactory(
+        submitter, scratchDir, NJOBS, conf, startFlag);
       return factory;
     }
   }
 
   @Test
-  public void testSubmit() throws Exception {
+  public void testReplaySubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.REPLAY;
+    System.out.println(" Replay started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Replay ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testStressSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.STRESS;
+    System.out.println(" Stress started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println(" Stress ended at " + System.currentTimeMillis());
+  }
+
+  @Test
+  public void testSerialSubmit() throws Exception {
+    policy = GridmixJobSubmissionPolicy.SERIAL;
+    System.out.println("Serial started at " + System.currentTimeMillis());
+    doSubmission();
+    System.out.println("Serial ended at " + System.currentTimeMillis());
+  }
+
+  public void doSubmission() throws Exception {
     final Path in = new Path("foo").makeQualified(dfs);
     final Path out = new Path("/gridmix").makeQualified(dfs);
+    final Path root = new Path("/user");
+    Configuration conf = null;
+    try{
     final String[] argv = {
       "-D" + FilePool.GRIDMIX_MIN_FILE + "=0",
       "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out,
@@ -311,11 +341,19 @@
       "-" // ignored by DebugGridmix
     };
     DebugGridmix client = new DebugGridmix();
-    final Configuration conf = mrCluster.createJobConf();
+    conf = mrCluster.createJobConf();
+    conf.set(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy.name());
     //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      in.getFileSystem(conf).delete(in, true);
+      out.getFileSystem(conf).delete(out, true);
+      root.getFileSystem(conf).delete(root, true);
+    }
   }
 
 }



Mime
View raw message