hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject svn commit: r1185694 [2/7] - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/ src/contrib/gridmix/sr...
Date Tue, 18 Oct 2011 14:45:51 GMT
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Tue Oct 18 14:45:48 2011
@@ -30,8 +30,10 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -41,6 +43,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -52,6 +55,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
@@ -86,14 +90,103 @@ class GenerateData extends GridmixJob {
    * Replication of generated data.
    */
   public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";
 
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
-    super(conf, 0L, "GRIDMIX_GENDATA");
+    super(conf, 0L, JOB_NAME);
     job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }
 
+  /**
+   * Represents the input data characteristics.
+   */
+  static class DataStatistics {
+    private long dataSize;
+    private long numFiles;
+    private boolean isDataCompressed;
+    
+    DataStatistics(long dataSize, long numFiles, boolean isCompressed) {
+      this.dataSize = dataSize;
+      this.numFiles = numFiles;
+      this.isDataCompressed = isCompressed;
+    }
+    
+    long getDataSize() {
+      return dataSize;
+    }
+    
+    long getNumFiles() {
+      return numFiles;
+    }
+    
+    boolean isDataCompressed() {
+      return isDataCompressed;
+    }
+  }
+  
+  /**
+   * Publish the data statistics.
+   */
+  static DataStatistics publishDataStatistics(Path inputDir, long genBytes, 
+                                              Configuration conf) 
+  throws IOException {
+    if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+      return CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
+                                        conf, genBytes);
+    } else {
+      return publishPlainDataStatistics(conf, inputDir);
+    }
+  }
+
+  /**
+   * List files recursively and get their statuses.
+   * @param path The path of the file/dir for which ls is to be done recursively
+   * @param fs FileSystem of the path
+   * @param filter the user-supplied path filter
+   * @return
+   */
+  private static List<FileStatus> listFiles(Path path, FileSystem fs,
+      PathFilter filter) throws IOException {
+    List<FileStatus> list = new ArrayList<FileStatus>();
+    FileStatus[] statuses = fs.listStatus(path, filter);
+    if (statuses != null) {
+      for (FileStatus status : statuses) {
+        if (status.isDir()) {
+          list.addAll(listFiles(status.getPath(), fs, filter));
+        } else {
+          list.add(status);
+        }
+      }
+    }
+    return list;
+  }
+
+  static DataStatistics publishPlainDataStatistics(Configuration conf, 
+                                                   Path inputDir) 
+  throws IOException {
+    FileSystem fs = inputDir.getFileSystem(conf);
+
+    // obtain input data file statuses
+    long dataSize = 0;
+    long fileCount = 0;
+    PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
+    List<FileStatus> statuses = listFiles(inputDir, fs, filter);
+
+    for (FileStatus fStat : statuses) {
+      dataSize += fStat.getLen();
+    }
+    fileCount = statuses.size();
+
+    // publish the plain data statistics
+    LOG.info("Total size of input data : " 
+             + StringUtils.humanReadableInt(dataSize));
+    LOG.info("Total number of input data files : " + fileCount);
+    
+    return new DataStatistics(dataSize, fileCount, false);
+  }
+  
   @Override
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
@@ -101,6 +194,18 @@ class GenerateData extends GridmixJob {
     ugi.doAs( new PrivilegedExceptionAction <Job>() {
        public Job run() throws IOException, ClassNotFoundException,
                                InterruptedException {
+         // check if compression emulation is enabled
+         if (CompressionEmulationUtil
+             .isCompressionEmulationEnabled(job.getConfiguration())) {
+           CompressionEmulationUtil.configure(job);
+         } else {
+           configureRandomBytesDataGenerator();
+         }
+         job.submit();
+         return job;
+       }
+       
+       private void configureRandomBytesDataGenerator() {
         job.setMapperClass(GenDataMapper.class);
         job.setNumReduceTasks(0);
         job.setMapOutputKeyClass(NullWritable.class);
@@ -113,12 +218,15 @@ class GenerateData extends GridmixJob {
         } catch (IOException e) {
           LOG.error("Error  while adding input path ", e);
         }
-        job.submit();
-        return job;
       }
     });
     return job;
   }
+  
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
 
   public static class GenDataMapper
       extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {

Added: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * GridmixJob that generates distributed cache files.
+ * {@link GenerateDistCacheData} expects a list of distributed cache files to be
+ * generated as input. This list is expected to be stored as a sequence file
+ * and the filename is expected to be configured using
+ * {@code gridmix.distcache.file.list}.
+ * This input file contains the list of distributed cache files and their sizes.
+ * For each record (i.e. file size and file path) in this input file,
+ * a file with the specific file size at the specific path is created.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GenerateDistCacheData extends GridmixJob {
+
+  /**
+   * Number of distributed cache files to be created by gridmix
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_COUNT =
+      "gridmix.distcache.file.count";
+  /**
+   * Total number of bytes to be written to the distributed cache files by
+   * gridmix. i.e. Sum of sizes of all unique distributed cache files to be
+   * created by gridmix.
+   */
+  static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
+      "gridmix.distcache.byte.count";
+  /**
+   * The special file created(and used) by gridmix, that contains the list of
+   * unique distributed cache files that are to be created and their sizes.
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_LIST =
+      "gridmix.distcache.file.list";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";
+
+  public GenerateDistCacheData(Configuration conf) throws IOException {
+    super(conf, 0L, JOB_NAME);
+  }
+
+  @Override
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDCDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDCDataFormat.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setJarByClass(GenerateDistCacheData.class);
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error while adding input path ", e);
+        }
+        job.submit();
+        return job;
+      }
+    });
+    return job;
+  }
+
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
+
+  public static class GenDCDataMapper
+      extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
+
+    private BytesWritable val;
+    private final Random r = new Random();
+    private FileSystem fs;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      val = new BytesWritable(new byte[context.getConfiguration().getInt(
+              GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+      fs = FileSystem.get(context.getConfiguration());
+    }
+
+    // Create one distributed cache file with the needed file size.
+    // key is distributed cache file size and
+    // value is distributed cache file path.
+    @Override
+    public void map(LongWritable key, BytesWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      String fileName = new String(value.getBytes(), 0, value.getLength());
+      Path path = new Path(fileName);
+
+      /**
+       * Create distributed cache file with the permissions 0755.
+       * Since the private distributed cache directory doesn't have execute
+       * permission for others, it is OK to set read permission for others for
+       * the files under that directory and still they will become 'private'
+       * distributed cache files on the simulated cluster.
+       */
+      FSDataOutputStream dos =
+          FileSystem.create(fs, path, new FsPermission((short)0755));
+
+      for (long bytes = key.get(); bytes > 0; bytes -= val.getLength()) {
+        r.nextBytes(val.getBytes());
+        val.setSize((int)Math.min(val.getLength(), bytes));
+        dos.write(val.getBytes(), 0, val.getLength());// Write to distCache file
+      }
+      dos.close();
+    }
+  }
+
+  /**
+   * InputFormat for GenerateDistCacheData.
+   * Input to GenerateDistCacheData is the special file(in SequenceFile format)
+   * that contains the list of distributed cache files to be generated along
+   * with their file sizes.
+   */
+  static class GenDCDataFormat
+      extends InputFormat<LongWritable, BytesWritable> {
+
+    // Split the special file that contains the list of distributed cache file
+    // paths and their file sizes such that each split corresponds to
+    // approximately same amount of distributed cache data to be generated.
+    // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
+    // for this job, if there is lot of data to be generated.
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
+      final JobClient client = new JobClient(jobConf);
+      ClusterStatus stat = client.getClusterStatus(true);
+      int numTrackers = stat.getTaskTrackers();
+      final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+
+      // Total size of distributed cache files to be generated
+      final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
+      // Get the path of the special file
+      String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
+      if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
+        throw new RuntimeException("Invalid metadata: #files (" + fileCount
+            + "), total_size (" + totalSize + "), filelisturi ("
+            + distCacheFileList + ")");
+      }
+
+      Path sequenceFile = new Path(distCacheFileList);
+      FileSystem fs = sequenceFile.getFileSystem(jobConf);
+      FileStatus srcst = fs.getFileStatus(sequenceFile);
+      // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
+      int numMapSlotsPerTracker =
+          jobConf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
+      int numSplits = numTrackers * numMapSlotsPerTracker;
+
+      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      BytesWritable value = new BytesWritable();
+
+      // Average size of data to be generated by each map task
+      final long targetSize = Math.max(totalSize / numSplits,
+                                DistributedCacheEmulator.AVG_BYTES_PER_MAP);
+      long splitStartPosition = 0L;
+      long splitEndPosition = 0L;
+      long acc = 0L;
+      long bytesRemaining = srcst.getLen();
+      SequenceFile.Reader reader = null;
+      try {
+        reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+        while (reader.next(key, value)) {
+
+          // If adding this file would put this split past the target size,
+          // cut the last split and put this file in the next split.
+          if (acc + key.get() > targetSize && acc != 0) {
+            long splitSize = splitEndPosition - splitStartPosition;
+            splits.add(new FileSplit(
+                sequenceFile, splitStartPosition, splitSize, (String[])null));
+            bytesRemaining -= splitSize;
+            splitStartPosition = splitEndPosition;
+            acc = 0L;
+          }
+          acc += key.get();
+          splitEndPosition = reader.getPosition();
+        }
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+      if (bytesRemaining != 0) {
+        splits.add(new FileSplit(
+            sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
+      }
+
+      return splits;
+    }
+
+    /**
+     * Returns a reader for this split of the distributed cache file list.
+     */
+    @Override
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException, InterruptedException {
+      return new SequenceFileRecordReader<LongWritable, BytesWritable>();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Oct 18 14:45:48 2011
@@ -33,12 +33,14 @@ import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 
 import org.apache.commons.logging.Log;
@@ -92,62 +94,143 @@ public class Gridmix extends Configured 
    */
   public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
 
+  /**
+   * Configuration property set in simulated job's configuration whose value is
+   * set to the corresponding original job's name. This is not configurable by
+   * gridmix user.
+   */
+  public static final String ORIGINAL_JOB_NAME =
+      "gridmix.job.original-job-name";
+  /**
+   * Configuration property set in simulated job's configuration whose value is
+   * set to the corresponding original job's id. This is not configurable by
+   * gridmix user.
+   */
+  public static final String ORIGINAL_JOB_ID = "gridmix.job.original-job-id";
+
+  private DistributedCacheEmulator distCacheEmulator;
+
   // Submit data structures
   private JobFactory factory;
   private JobSubmitter submitter;
   private JobMonitor monitor;
   private Statistics statistics;
+  private Summarizer summarizer;
 
   // Shutdown hook
   private final Shutdown sdh = new Shutdown();
 
+  Gridmix(String[] args) {
+    summarizer = new Summarizer(args);
+  }
+  
+  Gridmix() {
+    summarizer = new Summarizer();
+  }
+  
+  // Get the input data directory for Gridmix. Input directory is 
+  // <io-path>/input
+  static Path getGridmixInputDataPath(Path ioPath) {
+    return new Path(ioPath, "input");
+  }
+  
   /**
-   * Write random bytes at the path provided.
+   * Write random bytes at the path &lt;inputDir&gt;.
    * @see org.apache.hadoop.mapred.gridmix.GenerateData
    */
-  protected void writeInputData(long genbytes, Path ioPath)
+  protected void writeInputData(long genbytes, Path inputDir)
       throws IOException, InterruptedException {
     final Configuration conf = getConf();
-    final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
-    submitter.add(genData);
+    
+    // configure the compression ratio if needed
+    CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+    
+    final GenerateData genData = new GenerateData(conf, inputDir, genbytes);
     LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
         " of test data...");
-    // TODO add listeners, use for job dependencies
-    TimeUnit.SECONDS.sleep(10);
-    try {
-      genData.getJob().waitForCompletion(false);
-    } catch (ClassNotFoundException e) {
-      throw new IOException("Internal error", e);
-    }
-    if (!genData.getJob().isSuccessful()) {
-      throw new IOException("Data generation failed!");
-    }
-
+    launchGridmixJob(genData);
+    
     FsShell shell = new FsShell(conf);
     try {
-      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
-      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
+      LOG.info("Changing the permissions for inputPath " + inputDir.toString());
+      shell.run(new String[] {"-chmod","-R","777", inputDir.toString()});
     } catch (Exception e) {
       LOG.error("Couldnt change the file permissions " , e);
       throw new IOException(e);
     }
-    LOG.info("Done.");
+    
+    LOG.info("Input data generation successful.");
   }
 
-  protected InputStream createInputStream(String in) throws IOException {
-    if ("-".equals(in)) {
-      return System.in;
+  /**
+   * Write random bytes in the distributed cache files that will be used by all
+   * simulated jobs of current gridmix run, if files are to be generated.
+   * Do this as part of the MapReduce job {@link GenerateDistCacheData#JOB_NAME}
+   * @see org.apache.hadoop.mapred.gridmix.GenerateDistCacheData
+   */
+  protected void writeDistCacheData(Configuration conf)
+      throws IOException, InterruptedException {
+    int fileCount =
+        conf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+    if (fileCount > 0) {// generate distributed cache files
+      final GridmixJob genDistCacheData = new GenerateDistCacheData(conf);
+      LOG.info("Generating distributed cache data of size " + conf.getLong(
+          GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+      launchGridmixJob(genDistCacheData);
+    }
+  }
+
+  // Launch Input/DistCache Data Generation job and wait for completion
+  void launchGridmixJob(GridmixJob job)
+      throws IOException, InterruptedException {
+    submitter.add(job);
+
+    // TODO add listeners, use for job dependencies
+    TimeUnit.SECONDS.sleep(10);
+    try {
+      job.getJob().waitForCompletion(false);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Internal error", e);
     }
-    final Path pin = new Path(in);
-    return pin.getFileSystem(getConf()).open(pin);
+    if (!job.getJob().isSuccessful()) {
+      throw new IOException(job.getJob().getJobName() + " job failed!");
+    }
+  }
+
+  /**
+   * Create an appropriate {@code JobStoryProducer} object for the
+   * given trace.
+   * 
+   * @param traceIn the path to the trace file. The special path
+   * &quot;-&quot; denotes the standard input stream.
+   *
+   * @param conf the configuration to be used.
+   *
+   * @throws IOException if there was an error.
+   */
+  protected JobStoryProducer createJobStoryProducer(String traceIn,
+      Configuration conf) throws IOException {
+    if ("-".equals(traceIn)) {
+      return new ZombieJobProducer(System.in, null);
+    }
+    return new ZombieJobProducer(new Path(traceIn), null, conf);
   }
 
+  // get the gridmix job submission policy
+  protected static GridmixJobSubmissionPolicy getJobSubmissionPolicy(
+                                                Configuration conf) {
+    return GridmixJobSubmissionPolicy.getPolicy(conf, 
+                                        GridmixJobSubmissionPolicy.STRESS);
+  }
+  
   /**
    * Create each component in the pipeline and start it.
    * @param conf Configuration data, no keys specific to this context
    * @param traceIn Either a Path to the trace data or &quot;-&quot; for
    *                stdin
-   * @param ioPath Path from which input data is read
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir from which input data is
+   *               read and &lt;ioPath&gt;/distributedCache/ is the gridmix
+   *               distributed cache directory.
    * @param scratchDir Path into which job output is written
    * @param startFlag Semaphore for starting job trace pipeline
    */
@@ -155,8 +238,8 @@ public class Gridmix extends Configured 
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
     try {
-      GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
-        conf, GridmixJobSubmissionPolicy.STRESS);
+      Path inputDir = getGridmixInputDataPath(ioPath);
+      GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf);
       LOG.info(" Submission policy is " + policy.name());
       statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
       monitor = createJobMonitor(statistics);
@@ -167,16 +250,24 @@ public class Gridmix extends Configured 
         monitor, conf.getInt(
           GRIDMIX_SUB_THR, noOfSubmitterThreads), conf.getInt(
           GRIDMIX_QUE_DEP, 5), new FilePool(
-          conf, ioPath), userResolver,statistics);
+          conf, inputDir), userResolver,statistics);
       
-      factory = createJobFactory(
-        submitter, traceIn, scratchDir, conf, startFlag, userResolver);
+      distCacheEmulator = new DistributedCacheEmulator(conf, ioPath);
+
+      factory = createJobFactory(submitter, traceIn, scratchDir, conf,
+                                 startFlag, userResolver);
+      factory.jobCreator.setDistCacheEmulator(distCacheEmulator);
+
       if (policy==GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
         statistics.addClusterStatsObservers(factory);
       }
-      
+
+      // add the gridmix run summarizer to the statistics
+      statistics.addJobStatsListeners(summarizer.getExecutionSummarizer());
+      statistics.addClusterStatsObservers(summarizer.getClusterSummarizer());
+
       monitor.start();
       submitter.start();
     }catch(Exception e) {
@@ -201,9 +292,8 @@ public class Gridmix extends Configured 
     throws IOException {
     return GridmixJobSubmissionPolicy.getPolicy(
       conf, GridmixJobSubmissionPolicy.STRESS).createJobFactory(
-      submitter, new ZombieJobProducer(
-        createInputStream(
-          traceIn), null), scratchDir, conf, startFlag, resolver);
+      submitter, createJobStoryProducer(traceIn, conf), scratchDir, conf,
+      startFlag, resolver);
   }
 
   public int run(final String[] argv) throws IOException, InterruptedException {
@@ -217,6 +307,10 @@ public class Gridmix extends Configured 
         return runJob(conf,argv);
       }
     });
+    
+    // print the run summary
+    System.out.print("\n\n");
+    System.out.println(summarizer.toString());
     return val; 
   }
 
@@ -232,6 +326,9 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
+    
+    // Should gridmix generate distributed cache data ?
+    boolean generate = false;
     long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
@@ -243,6 +340,7 @@ public class Gridmix extends Configured 
       for (int i = 0; i < argv.length - 2; ++i) {
         if ("-generate".equals(argv[i])) {
           genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+          generate = true;
         } else if ("-users".equals(argv[i])) {
           userRsrc = new URI(argv[++i]);
         } else {
@@ -250,9 +348,22 @@ public class Gridmix extends Configured 
           return 1;
         }
       }
-      if (!userResolver.setTargetUsers(userRsrc, conf)) {
-        LOG.warn("Resource " + userRsrc + " ignored");
+
+      if (userResolver.needsTargetUsersList()) {
+        if (userRsrc != null) {
+          if (!userResolver.setTargetUsers(userRsrc, conf)) {
+            LOG.warn("Ignoring the user resource '" + userRsrc + "'.");
+          }
+        } else {
+          System.err.println("\n\n" + userResolver.getClass()
+              + " needs target user list. Use -users option." + "\n\n");
+          printUsage(System.err);
+          return 1;
+        }
+      } else if (userRsrc != null) {
+        LOG.warn("Ignoring the user resource '" + userRsrc + "'.");
       }
+
       ioPath = new Path(argv[argv.length - 2]);
       traceIn = argv[argv.length - 1];
     } catch (Exception e) {
@@ -260,17 +371,46 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
-    return start(conf, traceIn, ioPath, genbytes, userResolver);
+    return start(conf, traceIn, ioPath, genbytes, userResolver, generate);
   }
 
+  /**
+   * 
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath Working directory for gridmix. GenerateData job
+   *               will generate data in the directory &lt;ioPath&gt;/input/ and
+   *               distributed cache data is generated in the directory
+   *               &lt;ioPath&gt;/distributedCache/, if -generate option is
+   *               specified.
+   * @param genbytes size of input data to be generated under the directory
+   *                 &lt;ioPath&gt;/input/
+   * @param userResolver gridmix user resolver
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
   int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
-      UserResolver userResolver) throws IOException, InterruptedException {
+      UserResolver userResolver, boolean generate)
+      throws IOException, InterruptedException {
+    DataStatistics stats = null;
     InputStream trace = null;
+    final FileSystem inputFs = ioPath.getFileSystem(conf);
+    ioPath = ioPath.makeQualified(inputFs);
+
     try {
+      // Create <ioPath> with 777 permissions
+      boolean succeeded = FileSystem.mkdirs(inputFs, ioPath,
+                                            new FsPermission((short) 0777));
+      if (!succeeded) {
+        throw new IOException("Creation of <ioPath> directory "
+                              + ioPath.toUri().toString() + " failed.");
+      }
+
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
-      final FileSystem scratchFs = scratchDir.getFileSystem(conf);
-      scratchFs.mkdirs(scratchDir, new FsPermission((short) 0777));
-      scratchFs.setPermission(scratchDir, new FsPermission((short) 0777));
+
       // add shutdown hook for SIGINT, etc.
       Runtime.getRuntime().addShutdownHook(sdh);
       CountDownLatch startFlag = new CountDownLatch(1);
@@ -278,12 +418,30 @@ public class Gridmix extends Configured 
         // Create, start job submission threads
         startThreads(conf, traceIn, ioPath, scratchDir, startFlag,
             userResolver);
+
+        Path inputDir = getGridmixInputDataPath(ioPath);
+
         // Write input data if specified
         if (genbytes > 0) {
-          writeInputData(genbytes, ioPath);
+          writeInputData(genbytes, inputDir);
         }
+
+        // publish the data statistics
+        stats = GenerateData.publishDataStatistics(inputDir, genbytes, conf);
+
         // scan input dir contents
         submitter.refreshFilePool();
+
+        // set up the needed things for emulation of various loads
+        int exitCode = setupEmulation(conf, traceIn, scratchDir, ioPath,
+                                      generate);
+        if (exitCode != 0) {
+          return exitCode;
+        }
+
+        // start the summarizer
+        summarizer.start(conf);
+        
         factory.start();
         statistics.start();
       } catch (Throwable e) {
@@ -313,12 +471,74 @@ public class Gridmix extends Configured 
 
       }
     } finally {
+      if (factory != null) {
+        summarizer.finalize(factory, traceIn, genbytes, userResolver, stats, 
+                            conf);
+      }
       IOUtils.cleanup(LOG, trace);
     }
     return 0;
   }
 
   /**
+   * Create gridmix output directory. Setup things for emulation of
+   * various loads, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param scratchDir gridmix output directory
+   * @param ioPath Working directory for gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private int setupEmulation(Configuration conf, String traceIn,
+      Path scratchDir, Path ioPath, boolean generate)
+      throws IOException, InterruptedException {
+    // create scratch directory(output directory of gridmix)
+    final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+    FileSystem.mkdirs(scratchFs, scratchDir, new FsPermission((short) 0777));
+
+    // Setup things needed for emulation of distributed cache load
+    return setupDistCacheEmulation(conf, traceIn, ioPath, generate);
+    // Setup emulation of other loads like CPU load, Memory load
+  }
+
+  /**
+   * Setup gridmix for emulation of distributed cache load. This includes
+   * generation of distributed cache files, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir where input data (a) exists
+   *               or (b) is generated. &lt;ioPath&gt;/distributedCache/ is the
+   *               folder where distributed cache data (a) exists or (b) is to be
+   *               generated by gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private int setupDistCacheEmulation(Configuration conf, String traceIn,
+      Path ioPath, boolean generate) throws IOException, InterruptedException {
+    distCacheEmulator.init(traceIn, factory.jobCreator, generate);
+    int exitCode = 0;
+    if (distCacheEmulator.shouldGenerateDistCacheData() ||
+        distCacheEmulator.shouldEmulateDistCacheLoad()) {
+
+      JobStoryProducer jsp = createJobStoryProducer(traceIn, conf);
+      exitCode = distCacheEmulator.setupGenerateDistCacheData(jsp);
+      if (exitCode == 0) {
+        // If there are files to be generated, run a MapReduce job to generate
+        // these distributed cache files of all the simulated jobs of this trace.
+        writeDistCacheData(conf);
+      }
+    }
+    return exitCode;
+  }
+
+  /**
    * Handles orderly shutdown by requesting that each component in the
    * pipeline abort its progress, waiting for each to exit and killing
    * any jobs still running on the cluster.
@@ -387,7 +607,7 @@ public class Gridmix extends Configured 
   public static void main(String[] argv) throws Exception {
     int res = -1;
     try {
-      res = ToolRunner.run(new Configuration(), new Gridmix(), argv);
+      res = ToolRunner.run(new Configuration(), new Gridmix(argv), argv);
     } finally {
       System.exit(res);
     }
@@ -416,6 +636,11 @@ public class Gridmix extends Configured 
     ToolRunner.printGenericCommandUsage(out);
     out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
+    out.println("Options:");
+    out.println("   -generate <MiB> : Generate input data of size MiB under "
+        + "<iopath>/input/ and generate\n\t\t     distributed cache data under "
+        + "<iopath>/distributedCache/.");
+    out.println("   -users <usersResourceURI> : URI that contains the users list.");
     out.println("Configuration parameters:");
     out.println("   General parameters:");
     out.printf("       %-48s : Output directory\n", GRIDMIX_OUT_DIR);
@@ -493,3 +718,4 @@ public class Gridmix extends Configured 
   }
 
 }
+

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Tue Oct 18 14:45:48 2011
@@ -17,24 +17,27 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -53,16 +56,17 @@ import org.apache.commons.logging.LogFac
  */
 abstract class GridmixJob implements Callable<Job>, Delayed {
 
-  public static final String JOBNAME = "GRIDMIX";
-  public static final String ORIGNAME = "gridmix.job.name.original";
+  // Gridmix job name format is GRIDMIX<6 digit sequence number>
+  public static final String JOB_NAME_PREFIX = "GRIDMIX";
   public static final Log LOG = LogFactory.getLog(GridmixJob.class);
 
   private static final ThreadLocal<Formatter> nameFormat =
     new ThreadLocal<Formatter>() {
       @Override
       protected Formatter initialValue() {
-        final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5);
-        sb.append(JOBNAME);
+        final StringBuilder sb =
+            new StringBuilder(JOB_NAME_PREFIX.length() + 6);
+        sb.append(JOB_NAME_PREFIX);
         return new Formatter(sb);
       }
     };
@@ -80,6 +84,14 @@ abstract class GridmixJob implements Cal
       "gridmix.job-submission.use-queue-in-trace";
   protected static final String GRIDMIX_DEFAULT_QUEUE = 
       "gridmix.job-submission.default-queue";
+  // configuration key to enable/disable High-Ram feature emulation
+  static final String GRIDMIX_HIGHRAM_EMULATION_ENABLE = 
+    "gridmix.highram-emulation.enable";
+  // configuration key to enable/disable task jvm options
+  static final String GRIDMIX_TASK_JVM_OPTIONS_ENABLE = 
+    "gridmix.task.jvm-options.enable";
+  private static final Pattern maxHeapPattern = 
+    Pattern.compile("-Xmx[0-9]+[kKmMgGtT]?+");
 
   private static void setJobQueue(Job job, String queue) {
     if (queue != null)
@@ -93,22 +105,56 @@ abstract class GridmixJob implements Cal
     this.jobdesc = jobdesc;
     this.seq = seq;
 
-    ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
+    ((StringBuilder)nameFormat.get().out()).setLength(JOB_NAME_PREFIX.length());
     try {
       job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
         public Job run() throws IOException {
-          Job ret = new Job(conf, nameFormat.get().format("%05d", seq)
-              .toString());
+
+          String jobId = null == jobdesc.getJobID()
+                         ? "<unknown>"
+                         : jobdesc.getJobID().toString();
+          Job ret = new Job(conf,
+                            nameFormat.get().format("%06d", seq).toString());
           ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
-          ret.getConfiguration().set(ORIGNAME,
-              null == jobdesc.getJobID() ? "<unknown>" : jobdesc.getJobID()
-                  .toString());
+
+          ret.getConfiguration().set(Gridmix.ORIGINAL_JOB_ID, jobId);
+          ret.getConfiguration().set(Gridmix.ORIGINAL_JOB_NAME,
+                                     jobdesc.getName());
           if (conf.getBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false)) {
             setJobQueue(ret, jobdesc.getQueueName());
           } else {
             setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
           }
 
+          // check if the job can emulate compression
+          if (canEmulateCompression()) {
+            // set the compression related configs if compression emulation is
+            // enabled
+            if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+              CompressionEmulationUtil.configureCompressionEmulation(
+                  jobdesc.getJobConf(), ret.getConfiguration());
+            }
+          }
+          
+          // configure high ram properties if enabled
+          if (conf.getBoolean(GRIDMIX_HIGHRAM_EMULATION_ENABLE, true)) {
+            configureHighRamProperties(jobdesc.getJobConf(), 
+                                       ret.getConfiguration());
+          }
+          
+          // configure task jvm options if enabled
+          // this knob can be turned off if there is a mismatch between the
+          // target (simulation) cluster and the original cluster. Such a 
+          // mismatch can result in job failures (due to memory issues) on the 
+          // target (simulated) cluster.
+          //
+          // TODO If configured, scale the original task's JVM (heap related)
+          //      options to suit the target (simulation) cluster
+          if (conf.getBoolean(GRIDMIX_TASK_JVM_OPTIONS_ENABLE, true)) {
+            configureTaskJVMOptions(jobdesc.getJobConf(), 
+                                    ret.getConfiguration());
+          }
+          
           return ret;
         }
       });
@@ -120,6 +166,185 @@ abstract class GridmixJob implements Cal
         submissionMillis, TimeUnit.MILLISECONDS);
     outdir = new Path(outRoot, "" + seq);
   }
+  
+  @SuppressWarnings("deprecation")
+  protected static void configureTaskJVMOptions(Configuration originalJobConf,
+                                                Configuration simulatedJobConf){
+    // Get the heap related java opts used for the original job and set the 
+    // same for the simulated job.
+    //    set task task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   JobConf.MAPRED_TASK_JAVA_OPTS);
+    //  set map task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   JobConf.MAPRED_MAP_TASK_JAVA_OPTS);
+
+    //  set reduce task heap options
+    configureTaskJVMMaxHeapOptions(originalJobConf, simulatedJobConf, 
+                                   JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS);
+  }
+  
+  // Configures the task's max heap options using the specified key
+  private static void configureTaskJVMMaxHeapOptions(Configuration srcConf, 
+                                                     Configuration destConf,
+                                                     String key) {
+    String srcHeapOpts = srcConf.get(key);
+    if (srcHeapOpts != null) {
+      List<String> srcMaxOptsList = new ArrayList<String>();
+      // extract the max heap options and ignore the rest
+      extractMaxHeapOpts(srcHeapOpts, srcMaxOptsList, 
+                         new ArrayList<String>());
+      if (srcMaxOptsList.size() > 0) {
+        List<String> destOtherOptsList = new ArrayList<String>();
+        // extract the other heap options and ignore the max options in the 
+        // destination configuration
+        String destHeapOpts = destConf.get(key);
+        if (destHeapOpts != null) {
+          extractMaxHeapOpts(destHeapOpts, new ArrayList<String>(), 
+                             destOtherOptsList);
+        }
+        
+        // the source configuration might have some task level max heap opts set
+        // remove these opts from the destination configuration and replace
+        // with the options set in the original configuration
+        StringBuilder newHeapOpts = new StringBuilder();
+        
+        for (String otherOpt : destOtherOptsList) {
+          newHeapOpts.append(otherOpt).append(" ");
+        }
+        
+        for (String opts : srcMaxOptsList) {
+          newHeapOpts.append(opts).append(" ");
+        }
+        
+        // set the final heap opts 
+        destConf.set(key, newHeapOpts.toString().trim());
+      }
+    }
+  }
+  
+  private static void extractMaxHeapOpts(String javaOptions,  
+      List<String> maxOpts,  List<String> others) {
+    for (String opt : javaOptions.split(" ")) {
+      Matcher matcher = maxHeapPattern.matcher(opt);
+      if (matcher.find()) {
+        maxOpts.add(opt);
+      } else {
+        others.add(opt);
+      }
+    }
+  }
+
+  // Scales the desired job-level configuration parameter. This API makes sure 
+  // that the ratio of the job level configuration parameter to the cluster 
+  // level configuration parameter is maintained in the simulated run. Hence 
+  // the values are scaled from the original cluster's configuration to the 
+  // simulated cluster's configuration for higher emulation accuracy.
+  // This kind of scaling is useful for memory parameters.
+  private static void scaleConfigParameter(Configuration sourceConf, 
+                        Configuration destConf, String clusterValueKey, 
+                        String jobValueKey, long defaultValue) {
+    long simulatedClusterDefaultValue = 
+           destConf.getLong(clusterValueKey, defaultValue);
+    
+    long originalClusterDefaultValue = 
+           sourceConf.getLong(clusterValueKey, defaultValue);
+    
+    long originalJobValue = 
+           sourceConf.getLong(jobValueKey, defaultValue);
+    
+    double scaleFactor = (double)originalJobValue/originalClusterDefaultValue;
+    
+    long simulatedJobValue = (long)(scaleFactor * simulatedClusterDefaultValue);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("For the job configuration parameter '" + jobValueKey 
+                + "' and the cluster configuration parameter '" 
+                + clusterValueKey + "', the original job's configuration value"
+                + " is scaled from '" + originalJobValue + "' to '" 
+                + simulatedJobValue + "' using the default (unit) value of "
+                + "'" + originalClusterDefaultValue + "' for the original "
+                + " cluster and '" + simulatedClusterDefaultValue + "' for the"
+                + " simulated cluster.");
+    }
+    
+    destConf.setLong(jobValueKey, simulatedJobValue);
+  }
+  
+  // Checks if the scaling of original job's memory parameter value is 
+  // valid
+  @SuppressWarnings("deprecation")
+  private static boolean checkMemoryUpperLimits(String jobKey, String limitKey,  
+                                                Configuration conf, 
+                                                boolean convertLimitToMB) {
+    if (conf.get(limitKey) != null) {
+      long limit = conf.getLong(limitKey, JobConf.DISABLED_MEMORY_LIMIT);
+      // scale only if the max memory limit is set.
+      if (limit >= 0) {
+        if (convertLimitToMB) {
+          limit /= (1024 * 1024); //Converting to MB
+        }
+        
+        long scaledConfigValue = 
+               conf.getLong(jobKey, JobConf.DISABLED_MEMORY_LIMIT);
+        
+        // check now
+        if (scaledConfigValue > limit) {
+          throw new RuntimeException("Simulated job's configuration" 
+              + " parameter '" + jobKey + "' got scaled to a value '" 
+              + scaledConfigValue + "' which exceeds the upper limit of '" 
+              + limit + "' defined for the simulated cluster by the key '" 
+              + limitKey + "'. To disable High-Ram feature emulation, set '" 
+              + GRIDMIX_HIGHRAM_EMULATION_ENABLE + "' to 'false'.");
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  // Check if the parameter scaling does not exceed the cluster limits.
+  @SuppressWarnings("deprecation")
+  private static void validateTaskMemoryLimits(Configuration conf, 
+                        String jobKey, String clusterMaxKey) {
+    if (!checkMemoryUpperLimits(jobKey, 
+        JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, conf, true)) {
+      checkMemoryUpperLimits(jobKey, clusterMaxKey, conf, false);
+    }
+  }
+
+  /**
+   * Sets the high ram job properties in the simulated job's configuration.
+   */
+  @SuppressWarnings("deprecation")
+  static void configureHighRamProperties(Configuration sourceConf, 
+                                         Configuration destConf) {
+    // set the memory per map task
+    scaleConfigParameter(sourceConf, destConf, 
+                         JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+                         JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 
+                         JobConf.DISABLED_MEMORY_LIMIT);
+    
+    // validate and fail early
+    validateTaskMemoryLimits(destConf,
+        JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, 
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY);
+    
+    // set the memory per reduce task
+    scaleConfigParameter(sourceConf, destConf, 
+                         JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+                         JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+                         JobConf.DISABLED_MEMORY_LIMIT);
+    // validate and fail early
+    validateTaskMemoryLimits(destConf,
+        JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, 
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY);
+  }
+
+  /**
+   * Indicates whether this {@link GridmixJob} supports compression emulation.
+   */
+  protected abstract boolean canEmulateCompression();
 
   protected GridmixJob(
     final Configuration conf, long submissionMillis, final String name)
@@ -289,13 +514,18 @@ abstract class GridmixJob implements Cal
         TaskAttemptContext job) throws IOException {
 
       Path file = getDefaultWorkFile(job, "");
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
-      final FSDataOutputStream fileOut = fs.create(file, false);
+      final DataOutputStream fileOut;
+
+      fileOut = 
+        new DataOutputStream(CompressionEmulationUtil
+            .getPossiblyCompressedOutputStream(file, job.getConfiguration()));
+
       return new RecordWriter<K,GridmixRecord>() {
         @Override
         public void write(K ignored, GridmixRecord value)
             throws IOException {
-          value.writeRandom(fileOut, value.getSize());
+          // Let the Gridmix record fill itself.
+          value.write(fileOut);
         }
         @Override
         public void close(TaskAttemptContext ctxt) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixKey.java Tue Oct 18 14:45:48 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.DataInputBuf
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 
 class GridmixKey extends GridmixRecord {
   static final byte REDUCE_SPEC = 0;
@@ -115,6 +116,22 @@ class GridmixKey extends GridmixRecord {
     setSize(origSize);
   }
 
+  /**
+   * Get the {@link ResourceUsageMetrics} stored in the key.
+   */
+  public ResourceUsageMetrics getReduceResourceUsageMetrics() {
+    assert REDUCE_SPEC == getType();
+    return spec.metrics;
+  }
+  
+  /**
+   * Store the {@link ResourceUsageMetrics} in the key.
+   */
+  public void setReduceResourceUsageMetrics(ResourceUsageMetrics metrics) {
+    assert REDUCE_SPEC == getType();
+    spec.setResourceUsageSpecification(metrics);
+  }
+  
   public byte getType() {
     return type;
   }
@@ -195,18 +212,35 @@ class GridmixKey extends GridmixRecord {
     long rec_in;
     long rec_out;
     long bytes_out;
+    private ResourceUsageMetrics metrics = null;
+    private int sizeOfResourceUsageMetrics = 0;
     public Spec() { }
 
     public void set(Spec other) {
       rec_in = other.rec_in;
       bytes_out = other.bytes_out;
       rec_out = other.rec_out;
+      setResourceUsageSpecification(other.metrics);
     }
 
+    /**
+     * Sets the {@link ResourceUsageMetrics} for this {@link Spec}.
+     */
+    public void setResourceUsageSpecification(ResourceUsageMetrics metrics) {
+      this.metrics = metrics;
+      if (metrics != null) {
+        this.sizeOfResourceUsageMetrics = metrics.size();
+      } else {
+        this.sizeOfResourceUsageMetrics = 0;
+      }
+    }
+    
     public int getSize() {
       return WritableUtils.getVIntSize(rec_in) +
              WritableUtils.getVIntSize(rec_out) +
-             WritableUtils.getVIntSize(bytes_out);
+             WritableUtils.getVIntSize(bytes_out) +
+             WritableUtils.getVIntSize(sizeOfResourceUsageMetrics) +
+             sizeOfResourceUsageMetrics;
     }
 
     @Override
@@ -214,6 +248,11 @@ class GridmixKey extends GridmixRecord {
       rec_in = WritableUtils.readVLong(in);
       rec_out = WritableUtils.readVLong(in);
       bytes_out = WritableUtils.readVLong(in);
+      sizeOfResourceUsageMetrics =  WritableUtils.readVInt(in);
+      if (sizeOfResourceUsageMetrics > 0) {
+        metrics = new ResourceUsageMetrics();
+        metrics.readFields(in);
+      }
     }
 
     @Override
@@ -221,6 +260,10 @@ class GridmixKey extends GridmixRecord {
       WritableUtils.writeVLong(out, rec_in);
       WritableUtils.writeVLong(out, rec_out);
       WritableUtils.writeVLong(out, bytes_out);
+      WritableUtils.writeVInt(out, sizeOfResourceUsageMetrics);
+      if (sizeOfResourceUsageMetrics > 0) {
+        metrics.write(out);
+      }
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Tue Oct 18 14:45:48 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 class GridmixRecord implements WritableComparable<GridmixRecord> {
 
@@ -39,6 +40,10 @@ class GridmixRecord implements WritableC
   private final DataOutputBuffer dob =
     new DataOutputBuffer(Long.SIZE / Byte.SIZE);
   private byte[] literal = dob.getData();
+  private boolean compressible = false;
+  private float compressionRatio = 
+    CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+  private RandomTextDataGenerator rtg = null;
 
   GridmixRecord() {
     this(1, 0L);
@@ -57,6 +62,19 @@ class GridmixRecord implements WritableC
     setSizeInternal(size);
   }
 
+  void setCompressibility(boolean compressible, float ratio) {
+    this.compressible = compressible;
+    this.compressionRatio = ratio;
+    // Initialize the RandomTextDataGenerator once for every GridMix record
+    // Note that RandomTextDataGenerator is needed only when the GridMix record
+    // is configured to generate compressible text data.
+    if (compressible) {
+      rtg = 
+        CompressionEmulationUtil.getRandomTextDataGenerator(ratio, 
+                                   RandomTextDataGenerator.DEFAULT_SEED);
+    }
+  }
+  
   private void setSizeInternal(int size) {
     this.size = Math.max(1, size);
     try {
@@ -79,6 +97,39 @@ class GridmixRecord implements WritableC
     return (x ^= (x << 17));
   }
 
+  /**
+   * Generate random text data that can be compressed. If the record is marked
+   * compressible (via {@link FileOutputFormat#COMPRESS}), only then the 
+   * random data will be text data else 
+   * {@link GridmixRecord#writeRandom(DataOutput, int)} will be invoked.
+   */
+  private void writeRandomText(DataOutput out, final int size) 
+  throws IOException {
+    long tmp = seed;
+    out.writeLong(tmp);
+    int i = size - (Long.SIZE / Byte.SIZE);
+    //TODO Should we use long for size. What if the data is more than 4G?
+    
+    String randomWord = rtg.getRandomWord();
+    byte[] bytes = randomWord.getBytes("UTF-8");
+    long randomWordSize = bytes.length;
+    while (i >= randomWordSize) {
+      out.write(bytes);
+      i -= randomWordSize;
+      
+      // get the next random word
+      randomWord = rtg.getRandomWord();
+      bytes = randomWord.getBytes("UTF-8");
+      // determine the random word size
+      randomWordSize = bytes.length;
+    }
+    
+    // pad the remaining bytes
+    if (i > 0) {
+      out.write(bytes, 0, i);
+    }
+  }
+  
   public void writeRandom(DataOutput out, final int size) throws IOException {
     long tmp = seed;
     out.writeLong(tmp);
@@ -120,8 +171,13 @@ class GridmixRecord implements WritableC
     WritableUtils.writeVInt(out, size);
     final int payload = size - WritableUtils.getVIntSize(size);
     if (payload > Long.SIZE / Byte.SIZE) {
-      writeRandom(out, payload);
+      if (compressible) {
+        writeRandomText(out, payload);
+      } else {
+        writeRandom(out, payload);
+      }
     } else if (payload > 0) {
+      //TODO What is compressible is turned on? LOG is a bad idea!
       out.write(literal, 0, payload);
     }
   }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Tue Oct 18 14:45:48 2011
@@ -25,9 +25,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +45,7 @@ class InputStriper {
   long currentStart;
   FileStatus current;
   final List<FileStatus> files = new ArrayList<FileStatus>();
+  final Configuration conf = new Configuration();
 
   /**
    * @param inputDir Pool from which files are requested.
@@ -91,7 +95,15 @@ class InputStriper {
       }
       currentStart += fromFile;
       bytes -= fromFile;
-      if (current.getLen() - currentStart == 0) {
+      // Switch to a new file if
+      //  - the current file is uncompressed and completely used
+      //  - the current file is compressed
+      
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(current.getPath());
+      if (current.getLen() - currentStart == 0
+          || codec != null) {
         current = files.get(++idx % files.size());
         currentStart = 0;
       }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Tue Oct 18 14:45:48 2011
@@ -18,31 +18,42 @@
 
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public enum JobCreator {
 
   LOADJOB {
     @Override
     public GridmixJob createGridmixJob(
-      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
-      UserGroupInformation ugi, int seq) throws IOException {
+      Configuration gridmixConf, long submissionMillis, JobStory jobdesc,
+      Path outRoot, UserGroupInformation ugi, int seq) throws IOException {
+
+      // Build configuration for this simulated job
+      Configuration conf = new Configuration(gridmixConf);
+      dce.configureDistCacheFiles(conf, jobdesc.getJobConf());
       return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
-    }},
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return true;
+    }
+  },
 
   SLEEPJOB {
     private String[] hosts;
@@ -72,12 +83,30 @@ public enum JobCreator {
       }
       return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
           numLocations, hosts);
-    }};
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return false;
+    }
+  };
 
   public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
   public static final String SLEEPJOB_RANDOM_LOCATIONS = 
     "gridmix.sleep.fake-locations";
 
+  /**
+   * Create Gridmix simulated job.
+   * @param conf configuration of simulated job
+   * @param submissionMillis At what time submission of this simulated job be
+   *                         done
+   * @param jobdesc JobStory obtained from trace
+   * @param outRoot gridmix output directory
+   * @param ugi UGI of job submitter of this simulated job
+   * @param seq job sequence number
+   * @return the created simulated job
+   * @throws IOException
+   */
   public abstract GridmixJob createGridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
     Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
@@ -86,4 +115,21 @@ public enum JobCreator {
     Configuration conf, JobCreator defaultPolicy) {
     return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
   }
+
+  /**
+   * @return true if gridmix simulated jobs of this job type can emulate
+   *         distributed cache load
+   */
+  abstract boolean canEmulateDistCacheLoad();
+
+  DistributedCacheEmulator dce;
+  /**
+   * This method is to be called before calling any other method in JobCreator
+   * except canEmulateDistCacheLoad(), especially if canEmulateDistCacheLoad()
+   * returns true for that job type.
+   * @param e Distributed Cache Emulator
+   */
+  void setDistCacheEmulator(DistributedCacheEmulator e) {
+    this.dce = e;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Tue Oct 18 14:45:48 2011
@@ -63,6 +63,7 @@ abstract class JobFactory<T> implements 
   protected final JobStoryProducer jobProducer;
   protected final ReentrantLock lock = new ReentrantLock(true);
   protected final JobCreator jobCreator;
+  protected int numJobsInTrace = 0;
 
   /**
    * Creating a new instance does not start the thread.
@@ -112,7 +113,7 @@ abstract class JobFactory<T> implements 
     public MinTaskInfo(TaskInfo info) {
       super(info.getInputBytes(), info.getInputRecords(),
             info.getOutputBytes(), info.getOutputRecords(),
-            info.getTaskMemory());
+            info.getTaskMemory(), info.getResourceUsageMetrics());
     }
     public long getInputBytes() {
       return Math.max(0, super.getInputBytes());
@@ -168,13 +169,33 @@ abstract class JobFactory<T> implements 
 
   protected abstract Thread createReaderThread() ;
 
+  //gets the next job from the trace and does some bookkeeping for the same
+  private JobStory getNextJobFromTrace() throws IOException {
+    JobStory story = jobProducer.getNextJob();
+    if (story != null) {
+      ++numJobsInTrace;
+    }
+    return story;
+  }
+
   protected JobStory getNextJobFiltered() throws IOException {
-    JobStory job;
-    do {
-      job = jobProducer.getNextJob();
-    } while (job != null
-        && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
-            job.getSubmissionTime() < 0));
+    JobStory job = getNextJobFromTrace();
+    while (job != null &&
+           (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
+            job.getSubmissionTime() < 0)) {
+      if (LOG.isDebugEnabled()) {
+        String reason = null;
+        if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) {
+          reason = "STATE (" + job.getOutcome().name() + ") ";
+        }
+        if (job.getSubmissionTime() < 0) {
+          reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")";
+        }
+        LOG.debug("Ignoring job " + job.getJobID() + " from the input trace."
+                  + " Reason: " + reason == null ? "N/A" : reason);
+      }
+      job = getNextJobFromTrace();
+    }
     return null == job ? null : new FilterJobStory(job) {
         @Override
         public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Tue Oct 18 14:45:48 2011
@@ -78,13 +78,13 @@ class JobMonitor implements Gridmix.Comp
   }
 
   /**
-   * Add a submission failed job , such tht it can be communicated
+   * Add a submission failed job , such that it can be communicated
    * back to serial.
    * TODO: Cleaner solution for this problem
    * @param job
    */
   public void submissionFailed(Job job) {
-    LOG.info(" Job submission failed notify if anyone is waiting " + job);
+    LOG.info("Job submission failed notification for job " + job.getJobID());
     this.statistics.add(job);
   }
 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Tue Oct 18 14:45:48 2011
@@ -127,7 +127,7 @@ class JobSubmitter implements Gridmix.Co
         monitor.submissionFailed(job.getJob());
       } catch(Exception e) {
         //Due to some exception job wasnt submitted.
-        LOG.info(" Job " + job.getJob() + " submission failed " , e);
+        LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e);
         monitor.submissionFailed(job.getJob());
       } finally {
         sem.release();

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Tue Oct 18 14:45:48 2011
@@ -22,6 +22,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.gridmix.emulators.resourceusage.ResourceUsageMatcher;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -30,10 +33,13 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 import org.apache.hadoop.tools.rumen.TaskInfo;
 
 import java.io.IOException;
@@ -83,6 +89,106 @@ class LoadJob extends GridmixJob {
     return job;
   }
 
+  @Override
+  protected boolean canEmulateCompression() {
+    return true;
+  }
+  
+  /**
+   * This is a progress based resource usage matcher.
+   */
+  @SuppressWarnings("unchecked")
+  static class ResourceUsageMatcherRunner extends Thread {
+    private final ResourceUsageMatcher matcher;
+    private final Progressive progress;
+    private final long sleepTime;
+    private static final String SLEEP_CONFIG = 
+      "gridmix.emulators.resource-usage.sleep-duration";
+    private static final long DEFAULT_SLEEP_TIME = 100; // 100ms
+    
+    ResourceUsageMatcherRunner(final TaskInputOutputContext context, 
+                               ResourceUsageMetrics metrics) {
+      Configuration conf = context.getConfiguration();
+      
+      // set the resource calculator plugin
+      Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TaskTracker.TT_RESOURCE_CALCULATOR_PLUGIN,
+                      null, ResourceCalculatorPlugin.class);
+      ResourceCalculatorPlugin plugin = 
+        ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
+      
+      // set the other parameters
+      this.sleepTime = conf.getLong(SLEEP_CONFIG, DEFAULT_SLEEP_TIME);
+      progress = new Progressive() {
+        @Override
+        public float getProgress() {
+          return context.getProgress();
+        }
+      };
+      
+      // instantiate a resource-usage-matcher
+      matcher = new ResourceUsageMatcher();
+      matcher.configure(conf, plugin, metrics, progress);
+    }
+    
+    protected void match() throws Exception {
+      // match the resource usage
+      matcher.matchResourceUsage();
+    }
+    
+    @Override
+    public void run() {
+      LOG.info("Resource usage matcher thread started.");
+      try {
+        while (progress.getProgress() < 1) {
+          // match
+          match();
+          
+          // sleep for some time
+          try {
+            Thread.sleep(sleepTime);
+          } catch (Exception e) {}
+        }
+        
+        // match for progress = 1
+        match();
+        LOG.info("Resource usage emulation complete! Matcher exiting");
+      } catch (Exception e) {
+        LOG.info("Exception while running the resource-usage-emulation matcher"
+                 + " thread! Exiting.", e);
+      }
+    }
+  }
+  
+  // Makes sure that the TaskTracker doesn't kill the map/reduce tasks while
+  // they are emulating
+  private static class StatusReporter extends Thread {
+    private TaskInputOutputContext context;
+    StatusReporter(TaskInputOutputContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Status reporter thread started.");
+      try {
+        while (context.getProgress() < 1) {
+          // report progress
+          context.progress();
+
+          // sleep for some time
+          try {
+            Thread.sleep(100); // sleep for 100ms
+          } catch (Exception e) {}
+        }
+        
+        LOG.info("Status reporter thread exiting");
+      } catch (Exception e) {
+        LOG.info("Exception while running the status reporter thread!", e);
+      }
+    }
+  }
+  
   public static class LoadMapper
       extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
 
@@ -95,6 +201,9 @@ class LoadJob extends GridmixJob {
     private final GridmixKey key = new GridmixKey();
     private final GridmixRecord val = new GridmixRecord();
 
+    private ResourceUsageMatcherRunner matcher = null;
+    private StatusReporter reporter = null;
+    
     @Override
     protected void setup(Context ctxt)
         throws IOException, InterruptedException {
@@ -104,6 +213,20 @@ class LoadJob extends GridmixJob {
       final long[] reduceBytes = split.getOutputBytes();
       final long[] reduceRecords = split.getOutputRecords();
 
+      // enable gridmix map output record for compression
+      final boolean emulateMapOutputCompression = 
+        CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+        && conf.getBoolean("mapred.compress.map.output", false);
+      float compressionRatio = 1.0f;
+      if (emulateMapOutputCompression) {
+        compressionRatio = 
+          CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
+        LOG.info("GridMix is configured to use a compression ratio of " 
+                 + compressionRatio + " for the map output data.");
+        key.setCompressibility(true, compressionRatio);
+        val.setCompressibility(true, compressionRatio);
+      }
+      
       long totalRecords = 0L;
       final int nReduces = ctxt.getNumReduceTasks();
       if (nReduces > 0) {
@@ -114,17 +237,30 @@ class LoadJob extends GridmixJob {
           if (i == id) {
             spec.bytes_out = split.getReduceBytes(idx);
             spec.rec_out = split.getReduceRecords(idx);
+            spec.setResourceUsageSpecification(
+                   split.getReduceResourceUsageMetrics(idx));
             ++idx;
             id += maps;
           }
+          // set the map output bytes such that the final reduce input bytes 
+          // match the expected value obtained from the original job
+          long mapOutputBytes = reduceBytes[i];
+          if (emulateMapOutputCompression) {
+            mapOutputBytes /= compressionRatio;
+          }
           reduces.add(new IntermediateRecordFactory(
-              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              new AvgRecordFactory(mapOutputBytes, reduceRecords[i], conf, 
+                                   5*1024),
               i, reduceRecords[i], spec, conf));
           totalRecords += reduceRecords[i];
         }
       } else {
-        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
-              conf));
+        long mapOutputBytes = reduceBytes[0];
+        if (emulateMapOutputCompression) {
+          mapOutputBytes /= compressionRatio;
+        }
+        reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0],
+                    conf, 5*1024));
         totalRecords = reduceRecords[0];
       }
       final long splitRecords = split.getInputRecords();
@@ -134,6 +270,13 @@ class LoadJob extends GridmixJob {
         : splitRecords;
       ratio = totalRecords / (1.0 * inputRecords);
       acc = 0.0;
+      
+      matcher = new ResourceUsageMatcherRunner(ctxt, 
+                      split.getMapResourceUsageMetrics());
+      
+      // start the status reporter thread
+      reporter = new StatusReporter(ctxt);
+      reporter.start();
     }
 
     @Override
@@ -151,6 +294,13 @@ class LoadJob extends GridmixJob {
         }
         context.write(key, val);
         acc -= 1.0;
+        
+        // match inline
+        try {
+          matcher.match();
+        } catch (Exception e) {
+          LOG.debug("Error in resource usage emulation! Message: ", e);
+        }
       }
     }
 
@@ -162,8 +312,18 @@ class LoadJob extends GridmixJob {
         while (factory.next(key, val)) {
           context.write(key, val);
           key.setSeed(r.nextLong());
+          
+          // match inline
+          try {
+            matcher.match();
+          } catch (Exception e) {
+            LOG.debug("Error in resource usage emulation! Message: ", e);
+          }
         }
       }
+      
+      // start the matcher thread since the map phase ends here
+      matcher.start();
     }
   }
 
@@ -177,6 +337,9 @@ class LoadJob extends GridmixJob {
     private double ratio;
     private RecordFactory factory;
 
+    private ResourceUsageMatcherRunner matcher = null;
+    private StatusReporter reporter = null;
+    
     @Override
     protected void setup(Context context)
         throws IOException, InterruptedException {
@@ -187,20 +350,48 @@ class LoadJob extends GridmixJob {
       long outBytes = 0L;
       long outRecords = 0L;
       long inRecords = 0L;
+      ResourceUsageMetrics metrics = new ResourceUsageMetrics();
       for (GridmixRecord ignored : context.getValues()) {
         final GridmixKey spec = context.getCurrentKey();
         inRecords += spec.getReduceInputRecords();
         outBytes += spec.getReduceOutputBytes();
         outRecords += spec.getReduceOutputRecords();
+        if (spec.getReduceResourceUsageMetrics() != null) {
+          metrics = spec.getReduceResourceUsageMetrics();
+        }
       }
       if (0 == outRecords && inRecords > 0) {
         LOG.info("Spec output bytes w/o records. Using input record count");
         outRecords = inRecords;
       }
+      
+      // enable gridmix reduce output record for compression
+      Configuration conf = context.getConfiguration();
+      if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && FileOutputFormat.getCompressOutput(context)) {
+        float compressionRatio = 
+          CompressionEmulationUtil
+            .getReduceOutputCompressionEmulationRatio(conf);
+        LOG.info("GridMix is configured to use a compression ratio of " 
+                 + compressionRatio + " for the reduce output data.");
+        val.setCompressibility(true, compressionRatio);
+        
+        // Set the actual output data size to make sure that the actual output 
+        // data size is same after compression
+        outBytes /= compressionRatio;
+      }
+      
       factory =
-        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+        new AvgRecordFactory(outBytes, outRecords, 
+                             context.getConfiguration(), 5*1024);
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
+      
+      matcher = new ResourceUsageMatcherRunner(context, metrics);
+      
+      // start the status reporter thread
+      reporter = new StatusReporter(context);
+      reporter.start();
     }
     @Override
     protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
@@ -210,6 +401,13 @@ class LoadJob extends GridmixJob {
         while (acc >= 1.0 && factory.next(null, val)) {
           context.write(NullWritable.get(), val);
           acc -= 1.0;
+          
+          // match inline
+          try {
+            matcher.match();
+          } catch (Exception e) {
+            LOG.debug("Error in resource usage emulation! Message: ", e);
+          }
         }
       }
     }
@@ -220,6 +418,13 @@ class LoadJob extends GridmixJob {
       while (factory.next(null, val)) {
         context.write(NullWritable.get(), val);
         val.setSeed(r.nextLong());
+        
+        // match inline
+        try {
+          matcher.match();
+        } catch (Exception e) {
+          LOG.debug("Error in resource usage emulation! Message: ", e);
+        }
       }
     }
   }
@@ -311,11 +516,13 @@ class LoadJob extends GridmixJob {
       final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
       final long[] specBytes = new long[nSpec];
       final long[] specRecords = new long[nSpec];
+      final ResourceUsageMetrics[] metrics = new ResourceUsageMetrics[nSpec];
       for (int j = 0; j < nSpec; ++j) {
         final TaskInfo info =
           jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
         specBytes[j] = info.getOutputBytes();
         specRecords[j] = info.getOutputRecords();
+        metrics[j] = info.getResourceUsageMetrics();
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
               i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
@@ -326,7 +533,8 @@ class LoadJob extends GridmixJob {
               info.getInputBytes(), 3), maps, i,
             info.getInputBytes(), info.getInputRecords(),
             info.getOutputBytes(), info.getOutputRecords(),
-            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+            reduceByteRatio, reduceRecordRatio, specBytes, specRecords,
+            info.getResourceUsageMetrics(), metrics));
     }
     pushDescription(id(), splits);
   }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadSplit.java Tue Oct 18 14:45:48 2011
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
 
 class LoadSplit extends CombineFileSplit {
   private int id;
@@ -39,6 +40,9 @@ class LoadSplit extends CombineFileSplit
   private long[] reduceOutputBytes = new long[0];
   private long[] reduceOutputRecords = new long[0];
 
+  private ResourceUsageMetrics mapMetrics;
+  private ResourceUsageMetrics[] reduceMetrics;
+
   LoadSplit() {
     super();
   }
@@ -46,7 +50,9 @@ class LoadSplit extends CombineFileSplit
   public LoadSplit(CombineFileSplit cfsplit, int maps, int id,
       long inputBytes, long inputRecords, long outputBytes,
       long outputRecords, double[] reduceBytes, double[] reduceRecords,
-      long[] reduceOutputBytes, long[] reduceOutputRecords)
+      long[] reduceOutputBytes, long[] reduceOutputRecords,
+      ResourceUsageMetrics metrics,
+      ResourceUsageMetrics[] rMetrics)
       throws IOException {
     super(cfsplit);
     this.id = id;
@@ -60,6 +66,8 @@ class LoadSplit extends CombineFileSplit
     nSpec = reduceOutputBytes.length;
     this.reduceOutputBytes = reduceOutputBytes;
     this.reduceOutputRecords = reduceOutputRecords;
+    this.mapMetrics = metrics;
+    this.reduceMetrics = rMetrics;
   }
 
   public int getId() {
@@ -97,6 +105,15 @@ class LoadSplit extends CombineFileSplit
   public long getReduceRecords(int i) {
     return reduceOutputRecords[i];
   }
+  
+  public ResourceUsageMetrics getMapResourceUsageMetrics() {
+    return mapMetrics;
+  }
+  
+  public ResourceUsageMetrics getReduceResourceUsageMetrics(int i) {
+    return reduceMetrics[i];
+  }
+  
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
@@ -116,6 +133,12 @@ class LoadSplit extends CombineFileSplit
       WritableUtils.writeVLong(out, reduceOutputBytes[i]);
       WritableUtils.writeVLong(out, reduceOutputRecords[i]);
     }
+    mapMetrics.write(out);
+    int numReduceMetrics = (reduceMetrics == null) ? 0 : reduceMetrics.length;
+    WritableUtils.writeVInt(out, numReduceMetrics);
+    for (int i = 0; i < numReduceMetrics; ++i) {
+      reduceMetrics[i].write(out);
+    }
   }
   @Override
   public void readFields(DataInput in) throws IOException {
@@ -144,5 +167,13 @@ class LoadSplit extends CombineFileSplit
       reduceOutputBytes[i] = WritableUtils.readVLong(in);
       reduceOutputRecords[i] = WritableUtils.readVLong(in);
     }
+    mapMetrics = new ResourceUsageMetrics();
+    mapMetrics.readFields(in);
+    int numReduceMetrics = WritableUtils.readVInt(in);
+    reduceMetrics = new ResourceUsageMetrics[numReduceMetrics];
+    for (int i = 0; i < numReduceMetrics; ++i) {
+      reduceMetrics[i] = new ResourceUsageMetrics();
+      reduceMetrics[i].readFields(in);
+    }
   }
 }



Mime
View raw message