hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r727144 - in /hadoop/core/trunk: ./ src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Tue, 16 Dec 2008 20:52:46 GMT
Author: cdouglas
Date: Tue Dec 16 12:52:45 2008
New Revision: 727144

URL: http://svn.apache.org/viewvc?rev=727144&view=rev
Log:
HADOOP-4753. Refactor gridmix2 to reduce code duplication.

Removed:
    hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixConfig.java
    hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/SortJobCreator.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java
    hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java
    hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=727144&r1=727143&r2=727144&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec 16 12:52:45 2008
@@ -294,6 +294,8 @@
     HADOOP-4545. Add example and test case of secondary sort for the reduce.
     (omalley)
 
+    HADOOP-4753. Refactor gridmix2 to reduce code duplication. (cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Modified: hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java?rev=727144&r1=727143&r2=727144&view=diff
==============================================================================
--- hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java (original)
+++ hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/CombinerJobCreator.java Tue Dec 16 12:52:45 2008
@@ -25,7 +25,7 @@
 
 public class CombinerJobCreator extends WordCount {
 
-  public JobConf createJob(String[] args) throws Exception {
+  public static JobConf createJob(String[] args) throws Exception {
     JobConf conf = new JobConf(WordCount.class);
     conf.setJobName("GridmixCombinerJob");
 

Modified: hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java?rev=727144&r1=727143&r2=727144&view=diff
==============================================================================
--- hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java (original)
+++ hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GenericMRLoadJobCreator.java Tue Dec 16 12:52:45 2008
@@ -33,7 +33,7 @@
 
 public class GenericMRLoadJobCreator extends GenericMRLoadGenerator {
 
-  public JobConf createJob(String[] argv, boolean mapoutputCompressed,
+  public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
       boolean outputCompressed) throws Exception {
 
     JobConf job = new JobConf();

Modified: hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java?rev=727144&r1=727143&r2=727144&view=diff
==============================================================================
--- hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java (original)
+++ hadoop/core/trunk/src/benchmarks/gridmix2/src/java/org/apache/hadoop/mapred/GridMixRunner.java Tue Dec 16 12:52:45 2008
@@ -18,1323 +18,439 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
-import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Iterator;
-import java.util.TreeMap;
 import java.util.Map.Entry;
-import java.io.IOException;
+import java.util.TreeMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.Sort;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.*;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.streaming.StreamJob;
-import org.apache.hadoop.mapred.GridMixConfig;
 
 public class GridMixRunner {
 
-  private static int NUM_OF_LARGE_JOBS_PER_CLASS = 0;
-
-  private static int NUM_OF_MEDIUM_JOBS_PER_CLASS = 0;
-
-  private static int NUM_OF_SMALL_JOBS_PER_CLASS = 0;
-
-  private static int NUM_OF_REDUCERS_FOR_LARGE_JOB = 370;
-
-  private static int NUM_OF_REDUCERS_FOR_MEDIUM_JOB = 170;
-
-  private static int NUM_OF_REDUCERS_FOR_SMALL_JOB = 15;
-
-  private static String GRID_MIX_DATA = "/gridmix/data";
-
-  private static String VARCOMPSEQ = GRID_MIX_DATA
-      + "/WebSimulationBlockCompressed";
-
-  private static String FIXCOMPSEQ = GRID_MIX_DATA
-      + "/MonsterQueryBlockCompressed";
-
-  private static String VARINFLTEXT = GRID_MIX_DATA + "/SortUncompressed";
-
-  private JobControl gridmix;
-
-  private FileSystem fs;
-
-  private GridMixConfig config;
+  private static final int NUM_OF_LARGE_JOBS_PER_CLASS = 0;
+  private static final int NUM_OF_MEDIUM_JOBS_PER_CLASS = 0;
+  private static final int NUM_OF_SMALL_JOBS_PER_CLASS = 0;
+
+  private static final int NUM_OF_REDUCERS_FOR_SMALL_JOB = 15;
+  private static final int NUM_OF_REDUCERS_FOR_MEDIUM_JOB = 170;
+  private static final int NUM_OF_REDUCERS_FOR_LARGE_JOB = 370;
+
+  private static final String GRID_MIX_DATA = "/gridmix/data";
+  private static final String VARCOMPSEQ =
+    GRID_MIX_DATA + "/WebSimulationBlockCompressed";
+  private static final String FIXCOMPSEQ =
+    GRID_MIX_DATA + "/MonsterQueryBlockCompressed";
+  private static final String VARINFLTEXT =
+    GRID_MIX_DATA + "/SortUncompressed";
 
   private static final String GRIDMIXCONFIG = "gridmix_config.xml";
 
+  private static final Configuration config = initConfig();
+  private static final FileSystem fs = initFs();
+  private final JobControl gridmix;
   private int numOfJobs = 0;
 
-  private void initConfig() {
-    String configFile = System.getenv("GRIDMIXCONFIG");
-    if (configFile == null) {
-      String configDir = System.getProperty("user.dir");
-      if (configDir == null) {
-        configDir = ".";
-      }
-      configFile = configDir + "/" + GRIDMIXCONFIG;
-    }
-
-    if (config == null) {
-      try {
-        Path fileResource = new Path(configFile);
-        config = new GridMixConfig();
-        config.addResource(fileResource);
-      } catch (Exception e) {
-        System.out.println("Error reading configuration file:" + configFile);
-      }
-    }
-  }
-
-  public GridMixRunner() throws IOException {
-    gridmix = new JobControl("GridMix");
-    Configuration conf = new Configuration();
-    try {
-      fs = FileSystem.get(conf);
-    } catch (IOException ex) {
-      System.out.println("fs initation error:" + ex.getMessage());
-      throw ex;
-    }
-    initConfig();
-  }
-
-  private void addStreamSort(int num_of_reducers, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-
-    String defaultIndir = VARINFLTEXT + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("streamSort.smallJobs.inputFiles",
-        defaultIndir);
-    String outdir = addTSSuffix("perf-out/stream-out-dir-small_");
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARINFLTEXT + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("streamSort.mediumJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/stream-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARINFLTEXT;
-      indir = getInputDirsFor("streamSort.largeJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/stream-out-dir-large_");
-    }
-
-    StringBuffer sb = new StringBuffer();
-
-    sb.append("-input ").append(indir).append(" ");
-    sb.append("-output ").append(outdir).append(" ");
-    sb.append("-mapper cat ");
-    sb.append("-reducer cat ");
-    sb.append("-numReduceTasks ").append(num_of_reducers);
-
-    String[] args = sb.toString().split(" ");
-
-    clearDir(outdir);
-    try {
-      JobConf jobconf = StreamJob.createJob(args);
-      jobconf.setJobName("GridmixStreamingSorter." + size);
-      jobconf.setCompressMapOutput(mapoutputCompressed);
-      jobconf.setBoolean("mapred.output.compress", outputCompressed);
-
-      Job job = new Job(jobconf);
-      gridmix.addJob(job);
-      numOfJobs++;
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      System.out.println(ex.toString());
-    }
-
-  }
-
-  private String getInputDirsFor(String jobType, String defaultIndir) {
-    String inputFile[] = config.getStrings(jobType, defaultIndir);
-    StringBuffer indirBuffer = new StringBuffer();
-    for (int i = 0; i < inputFile.length; i++) {
-      indirBuffer = indirBuffer.append(inputFile[i]).append(",");
-    }
-    return indirBuffer.substring(0, indirBuffer.length() - 1);
-  }
-
-  private void addStreamSortSmall(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addStreamSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "small");
-  }
-
-  private void addStreamSortMedium(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addStreamSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "medium");
-  }
-
-  private void addStreamSortLarge(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addStreamSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "large");
-  }
-
-  private void clearDir(String dir) {
-    try {
-      Path outfile = new Path(dir);
-      fs.delete(outfile);
-    } catch (IOException ex) {
-      ex.printStackTrace();
-      System.out.println("delete file error:");
-      System.out.println(ex.toString());
-    }
-  }
-
-  private void addJavaSort(int num_of_reducers, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-
-    String defaultIndir = VARINFLTEXT + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("javaSort.smallJobs.inputFiles",
-        defaultIndir);
-    String outdir = addTSSuffix("perf-out/sort-out-dir-small_");
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARINFLTEXT + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("javaSort.mediumJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/sort-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARINFLTEXT;
-      indir = getInputDirsFor("javaSort.largeJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/sort-out-dir-large_");
-    }
-
-    clearDir(outdir);
-
-    try {
-      JobConf jobConf = new JobConf();
-      jobConf.setJarByClass(Sort.class);
-      jobConf.setJobName("GridmixJavaSorter." + size);
-      jobConf.setMapperClass(IdentityMapper.class);
-      jobConf.setReducerClass(IdentityReducer.class);
-
-      jobConf.setNumReduceTasks(num_of_reducers);
-      jobConf
-          .setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
-      jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
-
-      jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
-      jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
-      jobConf.setCompressMapOutput(mapoutputCompressed);
-      jobConf.setBoolean("mapred.output.compress", outputCompressed);
-
-      FileInputFormat.addInputPaths(jobConf, indir);
-
-      FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
-
-      Job job = new Job(jobConf);
-
-      gridmix.addJob(job);
-      numOfJobs++;
-
-    } catch (Exception ex) {
-      ex.printStackTrace();
+  private enum Size {
+    SMALL("small",                               // name
+          "/{part-00000,part-00001,part-00002}", // default input subset
+          NUM_OF_SMALL_JOBS_PER_CLASS,           // defuault num jobs
+          NUM_OF_REDUCERS_FOR_SMALL_JOB),        // default num reducers
+    MEDIUM("medium",                             // name
+          "/{part-000*0,part-000*1,part-000*2}", // default input subset
+          NUM_OF_MEDIUM_JOBS_PER_CLASS,          // defuault num jobs
+          NUM_OF_REDUCERS_FOR_MEDIUM_JOB),       // default num reducers
+    LARGE("large",                               // name
+          "",                                    // default input subset
+          NUM_OF_LARGE_JOBS_PER_CLASS,           // defuault num jobs
+          NUM_OF_REDUCERS_FOR_LARGE_JOB);        // default num reducers
+
+    private final String str;
+    private final String path;
+    private final int numJobs;
+    private final int numReducers;
+    Size(String str, String path, int numJobs, int numReducers) {
+      this.str = str;
+      this.path = path;
+      this.numJobs = numJobs;
+      this.numReducers = numReducers;
+    }
+    public String defaultPath(String base) {
+      return base + path;
+    }
+    public int defaultNumJobs() {
+      return numJobs;
+    }
+    public int defaultNumReducers() {
+      return numReducers;
+    }
+    public String toString() {
+      return str;
     }
-
   }
 
-  private void addJavaSortSmall(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addJavaSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed, "small");
-  }
+  private enum GridMixJob {
+    STREAMSORT("streamSort") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("streamSort.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+      final String outdir = addTSSuffix("perf-out/stream-out-dir-" + size);
+
+      StringBuffer sb = new StringBuffer();
+      sb.append("-input ").append(indir).append(" ");
+      sb.append("-output ").append(outdir).append(" ");
+      sb.append("-mapper cat ");
+      sb.append("-reducer cat ");
+      sb.append("-numReduceTasks ").append(numReducers);
+      String[] args = sb.toString().split(" ");
 
-  private void addJavaSortMedium(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addJavaSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "medium");
-  }
-
-  private void addJavaSortLarge(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addJavaSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed, "large");
-  }
-
-  private boolean select(int total, int selected, int index) {
-    int step;
-    if (selected > 0 && selected < total) {
-      step = total / selected;
-    } else if (selected <= 0) {
-      return false;
-    } else {
-      return true;
-    }
-
-    int effectiveTotal = total - total % selected;
-
-    if (index <= effectiveTotal - 1 && (index % step == 0)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private void addTextSortJobs() {
-
-    int[] nums_of_small_streamsort_job = config.getInts(
-        "streamSort.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-    int[] nums_of_small_javasort_job = config.getInts(
-        "javaSort.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-
-    int num_of_small_streamsort_job_mapoutputCompressed = config.getInt(
-        "streamSort.smallJobs.numOfMapoutputCompressed", 0);
-    int num_of_small_javasort_job_mapoutputCompressed = config.getInt(
-        "javaSort.smallJobs.numOfMapoutputCompressed", 0);
-
-    int num_of_small_streamsort_job_outputCompressed = config.getInt(
-        "streamSort.smallJobs.numOfOutputCompressed",
-        NUM_OF_SMALL_JOBS_PER_CLASS);
-    int num_of_small_javasort_job_outputCompressed = config
-        .getInt("javaSort.smallJobs.numOfOutputCompressed",
-            NUM_OF_SMALL_JOBS_PER_CLASS);
-
-    int[] streamsort_smallJobs_numsOfReduces = config.getInts(
-        "streamSort.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-    int[] javasort_smallJobs_numsOfReduces = config.getInts(
-        "javaSort.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-
-    int len1, len2;
-
-    len1 = nums_of_small_streamsort_job.length;
-    len2 = streamsort_smallJobs_numsOfReduces.length;
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: "
-              + "streamSort.smallJobs.numOfJobs and streamSort.smallJobs.numOfReduces must have the same number of items");
-
-    }
-    int totalNum = 0;
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_streamsort_job[i];
-    }
-    int currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_streamsort_job = nums_of_small_streamsort_job[index];
-      int streamsort_smallJobs_numOfReduces = streamsort_smallJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_small_streamsort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_streamsort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_streamsort_job_outputCompressed, currentIndex);
-        addStreamSortSmall(streamsort_smallJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
+      clearDir(outdir);
+      try {
+        JobConf jobconf = StreamJob.createJob(args);
+        jobconf.setJobName("GridmixStreamingSorter." + size);
+        jobconf.setCompressMapOutput(mapoutputCompressed);
+        jobconf.setBoolean("mapred.output.compress", outputCompressed);
+        Job job = new Job(jobconf);
+        gridmix.addJob(job);
+      } catch (Exception ex) {
+        ex.printStackTrace();
       }
     }
+    },
 
-    len1 = nums_of_small_javasort_job.length;
-    len2 = javasort_smallJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_javasort_job[i];
-    }
+    JAVASORT("javaSort") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("javaSort.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
+      final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
 
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: javaSort.smallJobs.numOfJobs, "
-              + "javaSort.smallJobs.numOfReduces must have the same number of items");
+      clearDir(outdir);
 
-    }
+      try {
+        JobConf jobConf = new JobConf();
+        jobConf.setJarByClass(Sort.class);
+        jobConf.setJobName("GridmixJavaSorter." + size);
+        jobConf.setMapperClass(IdentityMapper.class);
+        jobConf.setReducerClass(IdentityReducer.class);
+
+        jobConf.setNumReduceTasks(numReducers);
+        jobConf.setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
+        jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
+
+        jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
+        jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
+        jobConf.setCompressMapOutput(mapoutputCompressed);
+        jobConf.setBoolean("mapred.output.compress", outputCompressed);
 
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_javasort_job = nums_of_small_javasort_job[index];
-      int javasort_smallJobs_numOfReduces = javasort_smallJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_small_javasort_job; i++) {
+        FileInputFormat.addInputPaths(jobConf, indir);
+        FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
 
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_javasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_javasort_job_outputCompressed, currentIndex);
+        Job job = new Job(jobConf);
+        gridmix.addJob(job);
 
-        addJavaSortSmall(javasort_smallJobs_numOfReduces, mapoutputCompressed,
-            outputCompressed);
-        currentIndex = currentIndex + 1;
+      } catch (Exception ex) {
+        ex.printStackTrace();
       }
     }
+    },
 
-    int[] nums_of_medium_streamsort_job = config.getInts(
-        "streamSort.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int[] nums_of_medium_javasort_job = config.getInts(
-        "javaSort.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-
-    int num_of_medium_streamsort_job_mapoutputCompressed = config.getInt(
-        "streamSort.mediumJobs.numOfMapoutputCompressed", 0);
-    int num_of_medium_javasort_job_mapoutputCompressed = config.getInt(
-        "javaSort.mediumJobs.numOfMapoutputCompressed", 0);
-
-    int num_of_medium_streamsort_job_outputCompressed = config.getInt(
-        "streamSort.mediumJobs.numOfOutputCompressed",
-        NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int num_of_medium_javasort_job_outputCompressed = config.getInt(
-        "javaSort.mediumJobs.numOfOutputCompressed",
-        NUM_OF_MEDIUM_JOBS_PER_CLASS);
-
-    int[] streamsort_mediumJobs_numsOfReduces = config.getInts(
-        "streamSort.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-    int[] javasort_mediumJobs_numsOfReduces = config.getInts(
-        "javaSort.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-
-    len1 = nums_of_medium_streamsort_job.length;
-    len2 = streamsort_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_streamsort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: streamSort.mediumJobs.numOfJobs, "
-              + "streamSort.mediumJobs.numOfReduces must have the same number of items");
+    WEBDATASCAN("webdataScan") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("webdataScan.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
+      final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-" + size);
+      StringBuffer sb = new StringBuffer();
+      sb.append("-keepmap 0.2 ");
+      sb.append("-keepred 5 ");
+      sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
+      sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+      sb.append("-outKey org.apache.hadoop.io.Text ");
+      sb.append("-outValue org.apache.hadoop.io.Text ");
+      sb.append("-indir ").append(indir).append(" ");
+      sb.append("-outdir ").append(outdir).append(" ");
+      sb.append("-r ").append(numReducers);
 
+      String[] args = sb.toString().split(" ");
+      clearDir(outdir);
+      try {
+        JobConf jobconf = GenericMRLoadJobCreator.createJob(
+            args, mapoutputCompressed, outputCompressed);
+        jobconf.setJobName("GridmixWebdatascan." + size);
+        Job job = new Job(jobconf);
+        gridmix.addJob(job);
+      } catch (Exception ex) {
+        System.out.println(ex.getStackTrace());
+      }
     }
+    },
 
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_streamsort_job = nums_of_medium_streamsort_job[index];
-      int streamsort_mediumJobs_numOfReduces = streamsort_mediumJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_medium_streamsort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_streamsort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_streamsort_job_outputCompressed, currentIndex);
-
-        addStreamSortMedium(streamsort_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
+    COMBINER("combiner") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("combiner.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
+      final String outdir = addTSSuffix("perf-out/combiner-out-dir-" + size);
+
+      StringBuffer sb = new StringBuffer();
+      sb.append("-r ").append(numReducers).append(" ");
+      sb.append("-indir ").append(indir).append(" ");
+      sb.append("-outdir ").append(outdir);
+      sb.append("-mapoutputCompressed ").append(mapoutputCompressed).append(" ");
+      sb.append("-outputCompressed ").append(outputCompressed);
 
+      String[] args = sb.toString().split(" ");
+      clearDir(outdir);
+      try {
+        JobConf jobconf = CombinerJobCreator.createJob(args);
+        jobconf.setJobName("GridmixCombinerJob." + size);
+        Job job = new Job(jobconf);
+        gridmix.addJob(job);
+      } catch (Exception ex) {
+        ex.printStackTrace();
       }
     }
+    },
 
-    len1 = nums_of_medium_javasort_job.length;
-    len2 = javasort_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_javasort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: javaSort.mediumJobs.numOfJobs, "
-              + "javaSort.mediumJobs.numOfReduces must have the same number of items");
-
-    }
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_javasort_job = nums_of_medium_javasort_job[index];
-      int javasort_mediumJobs_numOfReduces = javasort_mediumJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_medium_javasort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_javasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_javasort_job_outputCompressed, currentIndex);
-
-        addJavaSortMedium(javasort_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
+    MONSTERQUERY("monsterQuery") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("monsterQuery.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(FIXCOMPSEQ));
+      final String outdir = addTSSuffix("perf-out/mq-out-dir-" + size);
+      int iter = 3;
+      try {
+        Job pjob = null;
+        Job job = null;
+        for (int i = 0; i < iter; i++) {
+          String outdirfull = outdir + "." + i;
+          String indirfull = (0 == i) ? indir : outdir + "." + (i - 1);
+          Path outfile = new Path(outdirfull);
+
+          StringBuffer sb = new StringBuffer();
+          sb.append("-keepmap 10 ");
+          sb.append("-keepred 40 ");
+          sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
+          sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+          sb.append("-outKey org.apache.hadoop.io.Text ");
+          sb.append("-outValue org.apache.hadoop.io.Text ");
+          sb.append("-indir ").append(indirfull).append(" ");
+          sb.append("-outdir ").append(outdirfull).append(" ");
+          sb.append("-r ").append(numReducers);
+          String[] args = sb.toString().split(" ");
+
+          try {
+            fs.delete(outfile);
+          } catch (IOException ex) {
+            System.out.println(ex.toString());
+          }
+
+          JobConf jobconf = GenericMRLoadJobCreator.createJob(
+              args, mapoutputCompressed, outputCompressed);
+          jobconf.setJobName("GridmixMonsterQuery." + size);
+          job = new Job(jobconf);
+          if (pjob != null) {
+            job.addDependingJob(pjob);
+          }
+          gridmix.addJob(job);
+          pjob = job;
+        }
+      } catch (Exception e) {
+        System.out.println(e.getStackTrace());
       }
     }
+    },
 
-    int[] nums_of_large_streamsort_job = config.getInts(
-        "streamSort.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-    int[] nums_of_large_javasort_job = config.getInts(
-        "javaSort.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-
-    int num_of_large_streamsort_job_mapoutputCompressed = config.getInt(
-        "streamSort.largeJobs.numOfMapoutputCompressed", 0);
-    int num_of_large_javasort_job_mapoutputCompressed = config.getInt(
-        "javaSort.largeJobs.numOfMapoutputCompressed", 0);
-
-    int num_of_large_streamsort_job_outputCompressed = config.getInt(
-        "streamSort.largeJobs.numOfOutputCompressed",
-        NUM_OF_LARGE_JOBS_PER_CLASS);
-    int num_of_large_javasort_job_outputCompressed = config
-        .getInt("javaSort.largeJobs.numOfOutputCompressed",
-            NUM_OF_LARGE_JOBS_PER_CLASS);
-
-    int[] streamsort_largeJobs_numsOfReduces = config.getInts(
-        "streamSort.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-    int[] javasort_largeJobs_numsOfReduces = config.getInts(
-        "javaSort.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    len1 = nums_of_large_streamsort_job.length;
-    len2 = streamsort_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_streamsort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: streamSort.largeJobs.numOfJobs, "
-              + "streamSort.largeJobs.numOfReduces must have the same number of items");
+    WEBDATASORT("webdataSort") {
+    public void addJob(int numReducers, boolean mapoutputCompressed,
+        boolean outputCompressed, Size size, JobControl gridmix) {
+      final String prop = String.format("webdataSort.%sJobs.inputFiles", size);
+      final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
+      final String outdir = addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
+
+      StringBuffer sb = new StringBuffer();
+      sb.append("-keepmap 100 ");
+      sb.append("-keepred 100 ");
+      sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
+      sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
+      sb.append("-outKey org.apache.hadoop.io.Text ");
+      sb.append("-outValue org.apache.hadoop.io.Text ");
+      sb.append("-indir ").append(indir).append(" ");
+      sb.append("-outdir ").append(outdir).append(" ");
+      sb.append("-r ").append(numReducers);
 
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_streamsort_job = nums_of_large_streamsort_job[index];
-      int streamsort_largeJobs_numOfReduces = streamsort_largeJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_large_streamsort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_streamsort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_streamsort_job_outputCompressed, currentIndex);
-        addStreamSortLarge(streamsort_largeJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
+      String[] args = sb.toString().split(" ");
+      clearDir(outdir);
+      try {
+        JobConf jobconf = GenericMRLoadJobCreator.createJob(
+            args, mapoutputCompressed, outputCompressed);
+        jobconf.setJobName("GridmixWebdataSort." + size);
+        Job job = new Job(jobconf);
+        gridmix.addJob(job);
+      } catch (Exception ex) {
+        System.out.println(ex.getStackTrace());
       }
     }
+    };
 
-    len1 = nums_of_large_javasort_job.length;
-    len2 = javasort_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_javasort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: javaSort.largeJobs.numOfJobs, "
-              + "javaSort.largeJobs.numOfReduces must have the same number of items");
-
-    }
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_javasort_job = nums_of_large_javasort_job[index];
-      int javasort_largeJobs_numOfReduces = javasort_largeJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_large_javasort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_javasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_javasort_job_outputCompressed, currentIndex);
-
-        addJavaSortLarge(javasort_largeJobs_numOfReduces, mapoutputCompressed,
-            outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
+    private final String name;
+    GridMixJob(String name) {
+      this.name = name;
     }
-
-  }
-
-  private void addWebdataScan(int num_of_reducers, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-    String defaultIndir = VARCOMPSEQ + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("webdataScan.smallJobs.inputFiles",
-        defaultIndir);
-    String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-small_");
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("webdataScan.mediumJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/webdata-scan-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ;
-      indir = getInputDirsFor("webdataScan.largeJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/webdata-scan-dir-large_");
-    }
-
-    GenericMRLoadJobCreator jobcreator = new GenericMRLoadJobCreator();
-    StringBuffer sb = new StringBuffer();
-    sb.append("-keepmap 0.2 ");
-    sb.append("-keepred 5 ");
-    sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-    sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
-    sb.append("-outKey org.apache.hadoop.io.Text ");
-    sb.append("-outValue org.apache.hadoop.io.Text ");
-    sb.append("-indir ").append(indir).append(" ");
-    sb.append("-outdir ").append(outdir).append(" ");
-    sb.append("-r ").append(num_of_reducers);
-
-    String[] args = sb.toString().split(" ");
-    clearDir(outdir);
-    try {
-      JobConf jobconf = jobcreator.createJob(args, mapoutputCompressed,
-          outputCompressed);
-      jobconf.setJobName("GridmixWebdatascan." + size);
-      Job job = new Job(jobconf);
-      gridmix.addJob(job);
-      numOfJobs++;
-    } catch (Exception ex) {
-      System.out.println(ex.getStackTrace());
+    public String getName() {
+      return name;
     }
-
-  }
-
-  private void addWebdataScanSmall(int num_of_reducers,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addWebdataScan(num_of_reducers, mapoutputCompressed, outputCompressed,
-        "small");
-  }
-
-  private void addWebdataScanMedium(int num_of_reducers,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addWebdataScan(num_of_reducers, mapoutputCompressed, outputCompressed,
-        "medium");
-  }
-
-  private void addWebdataScanLarge(int num_of_reducers,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addWebdataScan(num_of_reducers, mapoutputCompressed, outputCompressed,
-        "large");
+    public abstract void addJob(int numReducers, boolean mapComp,
+        boolean outComp, Size size, JobControl gridmix);
   }
 
-  private void addWebdataScanJobs() {
-
-    int[] nums_of_small_webdatascan_job = config.getInts(
-        "webdataScan.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-    int num_of_small_webdatascan_job_mapoutputCompressed = config.getInt(
-        "webdataScan.smallJobs.numOfMapoutputCompressed", 0);
-    int num_of_small_webdatascan_job_outputCompressed = config.getInt(
-        "webdataScan.smallJobs.numOfOutputCompressed",
-        NUM_OF_REDUCERS_FOR_SMALL_JOB);
-
-    int[] webdatascan_smallJobs_numsOfReduces = config.getInts(
-        "webdataScan.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-    int len1, len2, totalNum, currentIndex;
-
-    len1 = nums_of_small_webdatascan_job.length;
-    len2 = webdatascan_smallJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_webdatascan_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdataScan.smallJobs.numOfJobs, "
-              + "webdataScan.smallJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_webdatascan_job = nums_of_small_webdatascan_job[index];
-      int webdatascan_smallJobs_numOfReduces = webdatascan_smallJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_small_webdatascan_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_webdatascan_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_webdatascan_job_outputCompressed, currentIndex);
-        addWebdataScanSmall(webdatascan_smallJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_medium_webdatascan_job = config.getInts(
-        "webdataScan.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int num_of_medium_webdatascan_job_mapoutputCompressed = config.getInt(
-        "webdataScan.mediumJobs.numOfMapoutputCompressed", 0);
-    int num_of_medium_webdatascan_job_outputCompressed = config.getInt(
-        "webdataScan.mediumJobs.numOfOutputCompressed",
-        NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-
-    int[] webdatascan_mediumJobs_numsOfReduces = config.getInts(
-        "webdataScan.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-
-    len1 = nums_of_medium_webdatascan_job.length;
-    len2 = webdatascan_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_webdatascan_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdataScan.mediumJobs.numOfJobs, "
-              + "webdataScan.mediumJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_webdatascan_job = nums_of_medium_webdatascan_job[index];
-      int webdatascan_mediumJobs_numOfReduces = webdatascan_mediumJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_medium_webdatascan_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_webdatascan_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_webdatascan_job_outputCompressed, currentIndex);
-        addWebdataScanMedium(webdatascan_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_large_webdatascan_job = config.getInts(
-        "webdataScan.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-    int num_of_large_webdatascan_job_mapoutputCompressed = config.getInt(
-        "webdataScan.largeJobs.numOfMapoutputCompressed", 0);
-    int num_of_large_webdatascan_job_outputCompressed = config.getInt(
-        "webdataScan.largeJobs.numOfOutputCompressed",
-        NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    int[] webdatascan_largeJobs_numsOfReduces = config.getInts(
-        "webdataScan.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    len1 = nums_of_large_webdatascan_job.length;
-    len2 = webdatascan_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_webdatascan_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdataScan.largeJobs.numOfJobs, "
-              + "webdataScan.largeJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_webdatascan_job = nums_of_large_webdatascan_job[index];
-      int webdatascan_largeJobs_numOfReduces = webdatascan_largeJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_large_webdatascan_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_webdatascan_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_webdatascan_job_outputCompressed, currentIndex);
-        addWebdataScanLarge(webdatascan_largeJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
+  public GridMixRunner() throws IOException {
+    gridmix = new JobControl("GridMix");
+    if (null == config || null == fs) {
+      throw new IOException("Bad configuration. Cannot continue.");
     }
-
   }
 
-  private void addCombiner(int num_of_reducers, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-
-    String defaultIndir = VARCOMPSEQ + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("combiner.smallJobs.inputFiles",
-        defaultIndir);
-    String outdir = addTSSuffix("perf-out/combiner-out-dir-small_");
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("combiner.mediumJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/combiner-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ;
-      indir = getInputDirsFor("combiner.largeJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/combiner-dir-large_");
-    }
-
-    CombinerJobCreator jobcreator = new CombinerJobCreator();
-    StringBuffer sb = new StringBuffer();
-    sb.append("-r ").append(num_of_reducers).append(" ");
-    sb.append("-indir ").append(indir).append(" ");
-    sb.append("-outdir ").append(outdir);
-    sb.append("-mapoutputCompressed ").append(mapoutputCompressed).append(" ");
-    sb.append("-outputCompressed ").append(outputCompressed);
-
-    String[] args = sb.toString().split(" ");
-    clearDir(outdir);
+  private static FileSystem initFs() {
     try {
-      JobConf jobconf = jobcreator.createJob(args);
-      jobconf.setJobName("GridmixCombinerJob." + size);
-      Job job = new Job(jobconf);
-      gridmix.addJob(job);
-      numOfJobs++;
-    } catch (Exception ex) {
-      ex.printStackTrace();
+      return FileSystem.get(config);
+    } catch (Exception e) {
+      System.out.println("fs initation error: " + e.getMessage());
     }
-
+    return null;
   }
 
-  private void addCombinerSmall(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addCombiner(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed, "small");
-  }
-
-  private void addCombinerMedium(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addCombiner(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed, "medium");
-  }
-
-  private void addCombinerLarge(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addCombiner(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed, "large");
-  }
-
-  private void addCombinerJobs() {
-    int[] nums_of_small_combiner_job = config.getInts(
-        "combiner.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-    int num_of_small_combiner_job_mapoutputCompressed = config.getInt(
-        "combiner.smallJobs.numOfMapoutputCompressed", 0);
-    int num_of_small_combiner_job_outputCompressed = config
-        .getInt("combiner.smallJobs.numOfOutputCompressed",
-            NUM_OF_SMALL_JOBS_PER_CLASS);
-    int[] combiner_smallJobs_numsOfReduces = config.getInts(
-        "combiner.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-    int len1, len2, totalNum, currentIndex;
-
-    len1 = nums_of_small_combiner_job.length;
-    len2 = combiner_smallJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_combiner_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: combiner.smallJobs.numOfJobs, "
-              + "combiner.smallJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_combiner_job = nums_of_small_combiner_job[index];
-      int combiner_smallJobs_numOfReduces = combiner_smallJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_small_combiner_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_combiner_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_combiner_job_outputCompressed, currentIndex);
-        addCombinerSmall(combiner_smallJobs_numOfReduces, mapoutputCompressed,
-            outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_medium_combiner_job = config.getInts(
-        "combiner.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int num_of_medium_combiner_job_mapoutputCompressed = config.getInt(
-        "combiner.mediumJobs.numOfMapoutputCompressed", 0);
-    int num_of_medium_combiner_job_outputCompressed = config.getInt(
-        "combiner.mediumJobs.numOfOutputCompressed",
-        NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int[] combiner_mediumJobs_numsOfReduces = config.getInts(
-        "combiner.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-
-    len1 = nums_of_medium_combiner_job.length;
-    len2 = combiner_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_combiner_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: combiner.mediumJobs.numOfJobs, "
-              + "combiner.mediumJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_combiner_job = nums_of_medium_combiner_job[index];
-      int combiner_mediumJobs_numOfReduces = combiner_mediumJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_medium_combiner_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_combiner_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_combiner_job_outputCompressed, currentIndex);
-
-        addCombinerMedium(combiner_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_large_combiner_job = config.getInts(
-        "combiner.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-    int num_of_large_combiner_job_mapoutputCompressed = config.getInt(
-        "combiner.largeJobs.numOfMapoutputCompressed", 0);
-    int num_of_large_combiner_job_outputCompressed = config
-        .getInt("combiner.largeJobs.numOfOutputCompressed",
-            NUM_OF_LARGE_JOBS_PER_CLASS);
-    int[] combiner_largeJobs_numsOfReduces = config.getInts(
-        "combiner.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    len1 = nums_of_large_combiner_job.length;
-    len2 = combiner_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_combiner_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: combiner.largeJobs.numOfJobs, "
-              + "combiner.largeJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_combiner_job = nums_of_large_combiner_job[index];
-      int combiner_largeJobs_numOfReduces = combiner_largeJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_large_combiner_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_combiner_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_combiner_job_outputCompressed, currentIndex);
-
-        addCombinerLarge(combiner_largeJobs_numOfReduces, mapoutputCompressed,
-            outputCompressed);
-        currentIndex = currentIndex + 1;
+  private static Configuration initConfig() {
+    Configuration conf = new Configuration();
+    String configFile = System.getenv("GRIDMIXCONFIG");
+    if (configFile == null) {
+      String configDir = System.getProperty("user.dir");
+      if (configDir == null) {
+        configDir = ".";
       }
+      configFile = configDir + "/" + GRIDMIXCONFIG;
     }
-
-  }
-
-  private void addMonsterQuery(int num_of_reducer, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-    GenericMRLoadJobCreator jobcreator = new GenericMRLoadJobCreator();
-    String defaultIndir = FIXCOMPSEQ + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("monsterQuery.smallJobs.inputFiles",
-        defaultIndir);
-    String outdir = addTSSuffix("perf-out/mq-out-dir-small_");
-
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = FIXCOMPSEQ + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("monsterQuery.mediumJobs.inputFiles",
-          defaultIndir);
-      outdir = addTSSuffix("perf-out/mq-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = FIXCOMPSEQ;
-      indir = getInputDirsFor("monsterQuery.largeJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/mq-out-dir-large_");
-    }
-
-    int iter = 3;
     try {
-
-      Job pjob = null;
-      Job job = null;
-      for (int i = 0; i < iter; i++) {
-        String outdirfull = outdir + "." + i;
-        String indirfull;
-        if (i == 0) {
-          indirfull = indir;
-        } else {
-          indirfull = outdir + "." + (i - 1);
-        }
-        Path outfile = new Path(outdirfull);
-
-        StringBuffer sb = new StringBuffer();
-
-        sb.append("-keepmap 10 ");
-        sb.append("-keepred 40 ");
-        sb
-            .append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-        sb
-            .append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
-        sb.append("-outKey org.apache.hadoop.io.Text ");
-        sb.append("-outValue org.apache.hadoop.io.Text ");
-
-        sb.append("-indir ").append(indirfull).append(" ");
-
-        sb.append("-outdir ").append(outdirfull).append(" ");
-        sb.append("-r ").append(num_of_reducer);
-
-        String[] args = sb.toString().split(" ");
-
-        try {
-          fs.delete(outfile);
-        } catch (IOException ex) {
-          System.out.println(ex.toString());
-        }
-
-        JobConf jobconf = jobcreator.createJob(args, mapoutputCompressed,
-            outputCompressed);
-        jobconf.setJobName("GridmixMonsterQuery." + size);
-        job = new Job(jobconf);
-        if (pjob != null) {
-          job.addDependingJob(pjob);
-        }
-        gridmix.addJob(job);
-        numOfJobs++;
-        pjob = job;
-
-      }
-
+      Path fileResource = new Path(configFile);
+      conf.addResource(fileResource);
     } catch (Exception e) {
-      System.out.println(e.getStackTrace());
+      System.err.println("Error reading config file " + configFile + ":" +
+          e.getMessage());
+      return null;
     }
+    return conf;
   }
 
-  private void addMonsterQuerySmall(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addMonsterQuery(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "small");
-  }
-
-  private void addMonsterQueryMedium(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addMonsterQuery(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "medium");
-  }
-
-  private void addMonsterQueryLarge(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addMonsterQuery(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "large");
-  }
-
-  private void addMonsterQueryJobs() {
-    int[] nums_of_small_monsterquery_job = config.getInts(
-        "monsterQuery.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-    int num_of_small_monsterquery_job_mapoutputCompressed = config.getInt(
-        "monsterQuery.smallJobs.numOfMapoutputCompressed", 0);
-    int num_of_small_monsterquery_job_outputCompressed = config.getInt(
-        "monsterQuery.smallJobs.numOfOutputCompressed",
-        NUM_OF_SMALL_JOBS_PER_CLASS);
-    int[] monsterquery_smallJobs_numsOfReduces = config.getInts(
-        "monsterQuery.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-    int len1, len2, totalNum, currentIndex;
-
-    len1 = nums_of_small_monsterquery_job.length;
-    len2 = monsterquery_smallJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_monsterquery_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: monseterquery.smallJobs.numOfJobs, "
-              + "monsterquery.smallJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_monsterquery_job = nums_of_small_monsterquery_job[index];
-      int monsterquery_smallJobs_numOfReduces = monsterquery_smallJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_small_monsterquery_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_monsterquery_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_monsterquery_job_outputCompressed, currentIndex);
-
-        addMonsterQuerySmall(monsterquery_smallJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_medium_monsterquery_job = config.getInts(
-        "monsterQuery.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int num_of_medium_monsterquery_job_mapoutputCompressed = config.getInt(
-        "monsterQuery.mediumJobs.numOfMapoutputCompressed", 0);
-    int num_of_medium_monsterquery_job_outputCompressed = config.getInt(
-        "monsterQuery.mediumJobs.numOfOutputCompressed",
-        NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int[] monsterquery_mediumJobs_numsOfReduces = config.getInts(
-        "monsterQuery.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-    len1 = nums_of_medium_monsterquery_job.length;
-    len2 = monsterquery_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_monsterquery_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: monseterquery.mediumJobs.numOfJobs, "
-              + "monsterquery.mediumJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_monsterquery_job = nums_of_medium_monsterquery_job[index];
-      int monsterquery_mediumJobs_numOfReduces = monsterquery_mediumJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_medium_monsterquery_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_monsterquery_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_monsterquery_job_outputCompressed, currentIndex);
-
-        addMonsterQueryMedium(monsterquery_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
-    }
-
-    int[] nums_of_large_monsterquery_job = config.getInts(
-        "monsterQuery.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-    int num_of_large_monsterquery_job_mapoutputCompressed = config.getInt(
-        "monsterQuery.largeJobs.numOfMapoutputCompressed", 0);
-    int num_of_large_monsterquery_job_outputCompressed = config.getInt(
-        "monsterQuery.largeJobs.numOfOutputCompressed",
-        NUM_OF_LARGE_JOBS_PER_CLASS);
-    int[] monsterquery_largeJobs_numsOfReduces = config.getInts(
-        "monsterQuery.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    len1 = nums_of_large_monsterquery_job.length;
-    len2 = monsterquery_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_monsterquery_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: monseterquery.largeJobs.numOfJobs, "
-              + "monsterquery.largeJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_monsterquery_job = nums_of_large_monsterquery_job[index];
-      int monsterquery_largeJobs_numOfReduces = monsterquery_largeJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_large_monsterquery_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_monsterquery_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_monsterquery_job_outputCompressed, currentIndex);
-
-        addMonsterQueryLarge(monsterquery_largeJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
+  private static int[] getInts(Configuration conf, String name, int defaultV) {
+    String[] vals = conf.getStrings(name, String.valueOf(defaultV));
+    int[] results = new int[vals.length];
+    for (int i = 0; i < vals.length; ++i) {
+      results[i] = Integer.parseInt(vals[i]);
     }
+    return results;
   }
 
-  private String addTSSuffix(String s) {
-    Date date = Calendar.getInstance().getTime();
-    String ts = String.valueOf(date.getTime());
-    return s + ts;
+  private static String getInputDirsFor(String jobType, String defaultIndir) {
+    String inputFile[] = config.getStrings(jobType, defaultIndir);
+    StringBuffer indirBuffer = new StringBuffer();
+    for (int i = 0; i < inputFile.length; i++) {
+      indirBuffer = indirBuffer.append(inputFile[i]).append(",");
+    }
+    return indirBuffer.substring(0, indirBuffer.length() - 1);
   }
 
-  private void addWebdataSort(int num_of_reducers, boolean mapoutputCompressed,
-      boolean outputCompressed, String size) {
-    String defaultIndir = VARCOMPSEQ + "/{part-00000,part-00001,part-00002}";
-    String indir = getInputDirsFor("webdataSort.smallJobs.inputFiles",
-        defaultIndir);
-
-    String outdir = addTSSuffix("perf-out/webdata-sort-out-dir-small_");
-    if ("medium".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ + "/{part-000*0,part-000*1,part-000*2}";
-      indir = getInputDirsFor("webdataSort.mediumJobs.inputFiles", defaultIndir);
-      outdir = addTSSuffix("perf-out/webdata-sort-out-dir-medium_");
-    } else if ("large".compareToIgnoreCase(size) == 0) {
-      defaultIndir = VARCOMPSEQ;
-      indir = getInputDirsFor("webdataSort.largeJobs.inputFiles", defaultIndir);
-
-      outdir = addTSSuffix("perf-out/webdata-sort-dir-large_");
-    }
-    GenericMRLoadJobCreator jobcreator = new GenericMRLoadJobCreator();
-    StringBuffer sb = new StringBuffer();
-    sb.append("-keepmap 100 ");
-    sb.append("-keepred 100 ");
-    sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
-    sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
-    sb.append("-outKey org.apache.hadoop.io.Text ");
-    sb.append("-outValue org.apache.hadoop.io.Text ");
-    sb.append("-indir ").append(indir).append(" ");
-    sb.append("-outdir ").append(outdir).append(" ");
-    sb.append("-r ").append(num_of_reducers);
-
-    String[] args = sb.toString().split(" ");
-    clearDir(outdir);
+  private static void clearDir(String dir) {
     try {
-      JobConf jobconf = jobcreator.createJob(args, mapoutputCompressed,
-          outputCompressed);
-      jobconf.setJobName("GridmixWebdataSort." + size);
-      Job job = new Job(jobconf);
-      gridmix.addJob(job);
-      numOfJobs++;
-    } catch (Exception ex) {
-      System.out.println(ex.getStackTrace());
+      Path outfile = new Path(dir);
+      fs.delete(outfile);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      System.out.println("delete file error:");
+      System.out.println(ex.toString());
     }
-
-  }
-
-  private void addWebdataSortSmall(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-    addWebdataSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "small");
   }
 
-  private void addWebdataSortMedium(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-
-    addWebdataSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "medium");
+  private boolean select(int total, int selected, int index) {
+    if (selected <= 0 || selected >= total) {
+      return selected > 0;
+    }
+    int step = total / selected;
+    int effectiveTotal = total - total % selected;
+    return (index <= effectiveTotal - 1 && (index % step == 0));
   }
 
-  private void addWebdataSortLarge(int NUM_OF_REDUCERS,
-      boolean mapoutputCompressed, boolean outputCompressed) {
-
-    addWebdataSort(NUM_OF_REDUCERS, mapoutputCompressed, outputCompressed,
-        "large");
+  private static String addTSSuffix(String s) {
+    Date date = Calendar.getInstance().getTime();
+    String ts = String.valueOf(date.getTime());
+    return s + "_" + ts;
   }
 
-  private void addWebdataSortJobs() {
-    int[] nums_of_small_webdatasort_job = config.getInts(
-        "webdataSort.smallJobs.numOfJobs", NUM_OF_SMALL_JOBS_PER_CLASS);
-    int num_of_small_webdatasort_job_mapoutputCompressed = config.getInt(
-        "webdataSort.smallJobs.numOfMapoutputCompressed", 0);
-    int num_of_small_webdatasort_job_outputCompressed = config.getInt(
-        "webdataSort.smallJobs.numOfOutputCompressed",
-        NUM_OF_SMALL_JOBS_PER_CLASS);
-    int[] webdatasort_smallJobs_numsOfReduces = config.getInts(
-        "webdataSort.smallJobs.numOfReduces", NUM_OF_REDUCERS_FOR_SMALL_JOB);
-
-    int len1, len2, totalNum, currentIndex;
-
-    len1 = nums_of_small_webdatasort_job.length;
-    len2 = webdatasort_smallJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_small_webdatasort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdatasort.smallJobs.numOfJobs, "
-              + "webdatasort.smallJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_small_webdatasort_job = nums_of_small_webdatasort_job[index];
-      int webdatasort_smallJobs_numOfReduces = webdatasort_smallJobs_numsOfReduces[index];
-
-      for (int i = 0; i < num_of_small_webdatasort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_small_webdatasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_small_webdatasort_job_outputCompressed, currentIndex);
-
-        addWebdataSortSmall(webdatasort_smallJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
+  private void addJobs(GridMixJob job, Size size) throws IOException {
+    final String prefix = String.format("%s.%sJobs", job.getName(), size);
+    int[] numJobs = getInts(config, prefix + ".numOfJobs",
+        size.defaultNumJobs());
+    int[] numReduces = getInts(config, prefix + ".numOfReduces",
+        size.defaultNumReducers());
+    if (numJobs.length != numReduces.length) {
+      throw new IOException("Configuration error: " +
+          prefix + ".numOfJobs must match " +
+          prefix + ".numOfReduces");
+    }
+    int numMapoutputCompressed = config.getInt(
+        prefix + ".numOfMapoutputCompressed", 0);
+    int numOutputCompressed = config.getInt(
+        prefix + ".numOfOutputCompressed", size.defaultNumJobs());
+    int totalJobs = 0;
+    for (int nJob : numJobs) {
+      totalJobs += nJob;
     }
-
-    int[] nums_of_medium_webdatasort_job = config.getInts(
-        "webdataSort.mediumJobs.numOfJobs", NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int num_of_medium_webdatasort_job_mapoutputCompressed = config.getInt(
-        "webdataSort.mediumJobs.numOfMapoutputCompressed", 0);
-    int num_of_medium_webdatasort_job_outputCompressed = config.getInt(
-        "webdataSort.mediumJobs.numOfOutputCompressed",
-        NUM_OF_MEDIUM_JOBS_PER_CLASS);
-    int[] webdatasort_mediumJobs_numsOfReduces = config.getInts(
-        "webdataSort.mediumJobs.numOfReduces", NUM_OF_REDUCERS_FOR_MEDIUM_JOB);
-
-    len1 = nums_of_medium_webdatasort_job.length;
-    len2 = webdatasort_mediumJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_medium_webdatasort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdatasort.mediumJobs.numOfJobs, "
-              + "webdatasort.mediumJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_medium_webdatasort_job = nums_of_medium_webdatasort_job[index];
-      int webdatasort_mediumJobs_numOfReduces = webdatasort_mediumJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_medium_webdatasort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_medium_webdatasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_medium_webdatasort_job_outputCompressed, currentIndex);
-
-        addWebdataSortMedium(webdatasort_mediumJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
+    int currentIndex = 0;
+    for (int i = 0; i < numJobs.length; ++i) {
+      for (int j = 0; j < numJobs[i]; ++j) {
+        boolean mapoutputComp =
+          select(totalJobs, numMapoutputCompressed, currentIndex);
+        boolean outputComp =
+          select(totalJobs, numOutputCompressed, currentIndex);
+        job.addJob(numReduces[i], mapoutputComp, outputComp, size, gridmix);
+        ++numOfJobs;
+        ++currentIndex;
       }
     }
+  }
 
-    int[] nums_of_large_webdatasort_job = config.getInts(
-        "webdataSort.largeJobs.numOfJobs", NUM_OF_LARGE_JOBS_PER_CLASS);
-    int num_of_large_webdatasort_job_mapoutputCompressed = config.getInt(
-        "webdataSort.largeJobs.numOfMapoutputCompressed", 0);
-    int num_of_large_webdatasort_job_outputCompressed = config.getInt(
-        "webdataSort.largeJobs.numOfOutputCompressed",
-        NUM_OF_LARGE_JOBS_PER_CLASS);
-    int[] webdatasort_largeJobs_numsOfReduces = config.getInts(
-        "webdataSort.largeJobs.numOfReduces", NUM_OF_REDUCERS_FOR_LARGE_JOB);
-
-    len1 = nums_of_large_webdatasort_job.length;
-    len2 = webdatasort_largeJobs_numsOfReduces.length;
-    totalNum = 0;
-
-    for (int i = 0; i < len1; i++) {
-      totalNum = totalNum + nums_of_large_webdatasort_job[i];
-    }
-
-    if (len1 != len2) {
-      System.out
-          .println(" Configuration error: webdatasort.largeJobs.numOfJobs, "
-              + "webdatasort.largeJobs.numOfReduces must have the same number of items");
-    }
-
-    currentIndex = 0;
-    for (int index = 0; index < len1; index++) {
-      int num_of_large_webdatasort_job = nums_of_large_webdatasort_job[index];
-      int webdatasort_largeJobs_numOfReduces = webdatasort_largeJobs_numsOfReduces[index];
-      for (int i = 0; i < num_of_large_webdatasort_job; i++) {
-        boolean mapoutputCompressed = select(totalNum,
-            num_of_large_webdatasort_job_mapoutputCompressed, currentIndex);
-        boolean outputCompressed = select(totalNum,
-            num_of_large_webdatasort_job_outputCompressed, currentIndex);
-
-        addWebdataSortLarge(webdatasort_largeJobs_numOfReduces,
-            mapoutputCompressed, outputCompressed);
-        currentIndex = currentIndex + 1;
-      }
+  private void addAllJobs(GridMixJob job) throws IOException {
+    for (Size size : EnumSet.allOf(Size.class)) {
+      addJobs(job, size);
     }
-
   }
 
-  public void addjobs() {
-
-    addTextSortJobs();
-
-    addCombinerJobs();
-
-    addMonsterQueryJobs();
-
-    addWebdataScanJobs();
-
-    addWebdataSortJobs();
-
+  public void addjobs() throws IOException {
+    for (GridMixJob jobtype : EnumSet.allOf(GridMixJob.class)) {
+      addAllJobs(jobtype);
+    }
     System.out.println("total " + gridmix.getWaitingJobs().size() + " jobs");
   }
 
   class SimpleStats {
     long minValue;
-
     long maxValue;
-
     long averageValue;
-
     long mediumValue;
-
     int n;
 
     SimpleStats(long[] data) {
@@ -1534,15 +650,10 @@
     gridmix.stop();
   }
 
-  public static void main(String argv[]) {
-
-    try {
-      GridMixRunner gridmixRunner = new GridMixRunner();
-      gridmixRunner.addjobs();
-      gridmixRunner.run();
-    } catch (Exception ex) {
-      System.out.println(ex.getMessage());
-    }
+  public static void main(String argv[]) throws Exception {
+    GridMixRunner gridmixRunner = new GridMixRunner();
+    gridmixRunner.addjobs();
+    gridmixRunner.run();
   }
 
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=727144&r1=727143&r2=727144&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Tue Dec 16 12:52:45 2008
@@ -61,7 +61,7 @@
   /**
    * Configure a job given argv.
    */
-  public boolean parseArgs(String[] argv, JobConf job) throws IOException {
+  public static boolean parseArgs(String[] argv, JobConf job) throws IOException {
     if (argv.length < 1) {
       return 0 == printUsage();
     }



Mime
View raw message