hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079234 - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ docs/src/documentation/content/xdocs/
Date Tue, 08 Mar 2011 05:58:47 GMT
Author: omalley
Date: Tue Mar  8 05:58:47 2011
New Revision: 1079234

URL: http://svn.apache.org/viewvc?rev=1079234&view=rev
Log:
commit 20b934c0e90b9f4dbe84ddc2305f9fc6d079fb6b
Author: Ravi Gummadi <gravi@yahoo-inc.com>
Date:   Wed Dec 22 23:38:55 2010 +0530

    : Make GridMix emulate usage of HDFS-based distributed cache
    files in simulated jobs. Patch is available at
     (gravi)
    
    +++ b/YAHOO-CHANGES.txt
    +  : Make GridMix emulate usage of HDFS-based distributed cache
    +  files in simulated jobs. Patch is available at
    +   (gravi)
    +

Added:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
    hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1079234&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Tue Mar  8 05:58:47 2011
@@ -0,0 +1,444 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Emulation of Distributed Cache Usage in gridmix.
+ * <br> Emulation of Distributed Cache Load in gridmix will put load on
+ * TaskTrackers and affects execution time of tasks because of localization of
+ * distributed cache files by TaskTrackers.
+ * <br> Gridmix creates private and public distributed cache files by launching
+ * a MapReduce job {@link GenerateDistCacheData} in advance i.e. before
+ * launching simulated jobs.
+ * <br> The file paths of original cluster are mapped to the simulation
+ * cluster's paths. The configuration properties like
+ * {@link MRJobConfig#CACHE_FILES}, {@link MRJobConfig#CACHE_FILE_VISIBILITIES},
+ * {@link MRJobConfig#CACHE_FILES_SIZES} and
+ * {@link MRJobConfig#CACHE_FILE_TIMESTAMPS} obtained from trace are used to
+ *  decide
+ * <li> file size of each distributed cache file to be generated
+ * <li> whether a dist cache file is already seen in this trace file
+ * <li> whether a distributed cache file be considered public or private.
+ * <br>
+ * <br> Gridmix configures these generated files as distributed cache files for
+ * the simulated jobs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DistributedCacheEmulator {
+  private static final Log LOG =
+      LogFactory.getLog(DistributedCacheEmulator.class);
+
+  static final long AVG_BYTES_PER_MAP = 128 * 1024 * 1024L;// 128MB
+
+  // If at least 1 dist cache file is missing in the expected dist cache dir,
+  // Gridmix cannot proceed with emulation of distributed cache load.
+  int MISSING_DIST_CACHE_FILES_ERROR = 1;
+
+  private Path distCachePath;
+  private Path privateDistCachePath = null;
+  private Path publicDistCachePath = null;
+
+  /**
+   * Map between simulated cluster's dist cache file paths and their file sizes.
+   * Unique dist cache files are entered into this map. 2 dist cache files are
+   * considered same if and only if their file paths, visibilities and
+   * timestamps are same.
+   */
+  private Map<String, Long> distCacheFiles = new HashMap<String, Long>();
+
+  /**
+   * Configuration property for whether gridmix should emulate
+   * distributed cache usage or not. Default value is true.
+   */
+  static final String GRIDMIX_EMULATE_DISTRIBUTEDCACHE =
+      "gridmix.distributed-cache-emulation.enable";
+
+  // Whether to emulate distributed cache usage or not
+  boolean emulateDistributedCache = true;
+
+  // Whether to generate dist cache data or not
+  boolean generateDistCacheData = false;
+
+  Configuration conf; // gridmix configuration
+  
+  /**
+   * @param conf gridmix configuration
+   * @param ioPath &lt;ioPath&gt;/distributedCache/ is the gridmix Distributed
+   *               Cache directory
+   */
+  public DistributedCacheEmulator(Configuration conf, Path ioPath) {
+    this.conf = conf;
+    distCachePath = new Path(ioPath, "distributedCache");
+  }
+
+  /**
+   * This is to be called before any other method of DistributedCacheEmulator.
+   * <br> Checks if emulation of Dist Cache load is needed and is feasible.
+   *  Sets the flags generateDistCacheData and emulateDistributedCache to the
+   *  appropriate values.
+   * <br> Gridmix does not emulate dist cache load if
+   * <ol><li> the specific gridmix job type doesn't need emulation of dist cache
+   * load OR
+   * <li> the trace is coming from a stream instead of file OR
+   * <li> the dist cache dir where dist cache data is to be generated by
+   * gridmix is on local file system OR
+   * <li> execute permission is not there for any of the ascendant directories
+   * of &lt;ioPath&gt; till root. This is because for emulation of dist cache
+   * load, dist cache files created under
+   * &lt;ioPath/distributedCache/public/&gt; should be considered by hadoop
+   * as public dist cache files.</ol>
+   * <br> For (2), (3) and (4), generation of distributed cache data
+   * is also disabled.
+   * 
+   * @param traceIn trace file path. If this is '-', then trace comes from the
+   *                stream stdin.
+   * @param jobCreator job creator of gridmix jobs of a specific type
+   * @param generate  true if -generate option was specified
+   * @throws IOException
+   */
+  void init(String traceIn, JobCreator jobCreator, boolean generate)
+      throws IOException {
+    emulateDistributedCache = jobCreator.canEmulateDistCacheLoad()
+        && conf.getBoolean(GRIDMIX_EMULATE_DISTRIBUTEDCACHE, true);
+    generateDistCacheData = generate;
+
+    if (generateDistCacheData || emulateDistributedCache) {
+      if ("-".equals(traceIn)) {// trace is from stdin
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "the input trace source is a stream instead of file.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else if (FileSystem.getLocal(conf).getUri().getScheme().equals(
+          distCachePath.toUri().getScheme())) {// local FS
+        LOG.warn("Gridmix will not emulate Distributed Cache load because "
+            + "<iopath> provided is on local file system.");
+        emulateDistributedCache = generateDistCacheData = false;
+      } else {
+        // Check if execute permission is there for all the ascendant
+        // directories of distCachePath till root.
+        FileSystem fs = FileSystem.get(conf);
+        Path cur = distCachePath.getParent();
+        while (cur != null) {
+          if (cur.toString().length() > 0) {
+            FsPermission perm = fs.getFileStatus(cur).getPermission();
+            if (!perm.getOtherAction().and(FsAction.EXECUTE).equals(
+                FsAction.EXECUTE)) {
+              LOG.warn("Gridmix will not emulate Distributed Cache load "
+                  + "because the ascendant directory (of distributed cache "
+                  + "directory) " + cur + " doesn't have execute permission "
+                  + "for others.");
+              emulateDistributedCache = generateDistCacheData = false;
+              break;
+            }
+          }
+          cur = cur.getParent();
+        }
+      }
+    }
+  }
+
+  /**
+   * @return true if gridmix should emulate Dist Cache load
+   */
+  boolean shouldEmulateDistCacheLoad() {
+    return emulateDistributedCache;
+  }
+
+  /**
+   * @return true if gridmix should generate distributed cache data
+   */
+  boolean shouldGenerateDistCacheData() {
+    return generateDistCacheData;
+  }
+
+  /**
+   * @return Dir under which gridmix creates private distributed cache files
+   */
+  Path getPrivateDistCacheDir() {
+    return privateDistCachePath;
+  }
+
+  /**
+   * @return Dir under which gridmix creates public distributed cache files
+   */
+  Path getPublicDistCacheDir() {
+    return publicDistCachePath;
+  }
+
+  /**
+   * Create distributed cache directories.
+   * Also create a file that contains the list of distributed cache files
+   * that will be used as dist cache files for all the simulated jobs.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  int setupGenerateDistCacheData(JobStoryProducer jsp)
+      throws IOException {
+
+    createDistCacheDirectories();
+    return buildDistCacheFilesList(jsp);
+  }
+
+  /**
+   * Create distributed cache directories where dist cache files will be
+   * created by the MapReduce job GRIDMIX_GENERATE_DISTCACHE_DATA.
+   * @throws IOException
+   */
+  void createDistCacheDirectories() throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    FileSystem.mkdirs(fs, distCachePath, new FsPermission((short) 0777));
+
+    // For private dist cache dir, no execute permission for others
+    privateDistCachePath =
+        new Path(distCachePath, "private");
+    FileSystem.mkdirs(fs, privateDistCachePath,
+        new FsPermission((short) 0776));
+
+    publicDistCachePath =
+        new Path(distCachePath, "public");
+    FileSystem.mkdirs(fs, publicDistCachePath,
+        new FsPermission((short) 0777));
+  }
+
+  /**
+   * Create the list of unique dist cache files needed for all the
+   * simulated jobs and write the list to a special file.
+   * @param jsp job story producer for the trace
+   * @return exit code
+   * @throws IOException
+   */
+  private int buildDistCacheFilesList(JobStoryProducer jsp) throws IOException {
+    // Read all the jobs from the trace file and build the list of unique
+    // dist cache files.
+    JobStory jobStory;
+    while ((jobStory = jsp.getNextJob()) != null) {
+      if (jobStory.getOutcome() == Pre21JobHistoryConstants.Values.SUCCESS && 
+         jobStory.getSubmissionTime() >= 0) {
+        updateDistCacheFilesList(jobStory);
+      }
+    }
+    jsp.close();
+
+    return writeDistCacheFilesList();
+  }
+
+  /**
+   * For the job to be simulated, identify the needed distributed cache files by
+   * mapping original cluster's dist cache file paths to the simulated cluster's
+   * paths and add these paths in the map {@code distCacheFiles}.
+   *<br>
+   * JobStory should contain dist cache related properties like
+   * <li> {@link MRJobConfig#CACHE_FILES}
+   * <li> {@link MRJobConfig#CACHE_FILE_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_FILES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_FILE_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_FILES}
+   *
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_VISIBILITIES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_SIZES}
+   * <li> {@link MRJobConfig#CACHE_ARCHIVES_TIMESTAMPS}
+   * <li> {@link MRJobConfig#CLASSPATH_ARCHIVES}
+   *
+   * <li> {@link MRJobConfig#CACHE_SYMLINK}
+   *
+   * @param jobdesc JobStory of original job obtained from trace
+   * @throws IOException
+   */
+  void updateDistCacheFilesList(JobStory jobdesc)
+      throws IOException {
+
+    // Map original job's dist cache file paths to simulated cluster's paths,
+    // to be used by this simulated job.
+    JobConf jobConf = jobdesc.getJobConf();
+
+    String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+    if (files != null) {
+
+      String[] fileSizes = jobConf.getStrings(MRJobConfig.CACHE_FILES_SIZES);
+      String[] visibilities =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+      String[] timeStamps =
+        jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+
+      FileSystem fs = FileSystem.get(conf);
+      for (int i = 0; i < files.length; i++) {
+        String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+                              Boolean.valueOf(visibilities[i]));
+
+        // No need to add a dist cache file path to the list if
+        // (1) the mapped path is already there in the list OR
+        // (2) the file with the mapped path already exists.
+        // In any of the above 2 cases, file paths, timestamps, file sizes and
+        // visibilities match. File sizes should match if file paths and
+        // timestamps match because single file path with single timestamp
+        // should correspond to a single file size.
+        if (distCacheFiles.containsKey(mappedPath) ||
+            fs.exists(new Path(mappedPath))) {
+          continue;
+        }
+        distCacheFiles.put(mappedPath, Long.valueOf(fileSizes[i]));
+      }
+    }
+  }
+
+  /**
+   * Map the dist cache file path from original cluster to simulated cluster
+   * @param file dist cache file path
+   * @param timeStamp time stamp of dist cachce file
+   * @param isPublic true if this dist cache file is a public dist cache file
+   * @return the mapped path on simulated cluster
+   */
+  private String mapDistCacheFilePath(String file, String timeStamp,
+      boolean isPublic) {
+    Path distCacheDir = isPublic ? publicDistCachePath
+                                 : privateDistCachePath;
+    return new Path(distCacheDir,
+        MD5Hash.digest(file + timeStamp).toString()).toUri().getPath();
+  }
+
+  /**
+   * Write the list of dist cache files in the decreasing order of file sizes
+   * into the sequence file. This file will be input to the job
+   * {@link GenerateDistCacheData}.
+   * Also validates if -generate option is missing and dist cache files are
+   * missing.
+   * @return exit code
+   * @throws IOException
+   */
+  private int writeDistCacheFilesList()
+      throws IOException {
+    // Sort the dist cache files based on file sizes in the decreasing order.
+    List dcFiles = new ArrayList(distCacheFiles.entrySet());
+    Collections.sort(dcFiles, new Comparator() {
+      public int compare(Object dc1, Object dc2) {
+        return ((Comparable) ((Map.Entry) (dc2)).getValue())
+       .compareTo(((Map.Entry) (dc1)).getValue());
+      }
+    });
+
+    // write the sorted dist cache files to the sequence file
+    FileSystem fs = FileSystem.get(conf);
+    Path distCacheFilesList = new Path(distCachePath, "_distCacheFiles.txt");
+    conf.set(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST,
+        distCacheFilesList.toString());
+    SequenceFile.Writer src_writer = SequenceFile.createWriter(fs, conf,
+        distCacheFilesList, LongWritable.class, BytesWritable.class,
+        SequenceFile.CompressionType.NONE);
+    int fileCount = dcFiles.size();// total number of unique dist cache files
+    long byteCount = 0;// total size of all dist cache files
+    long bytesSync = 0;// bytes after previous sync;used to add sync marker
+
+    for (Iterator it = dcFiles.iterator(); it.hasNext();) {
+      Map.Entry entry = (Map.Entry)it.next();
+      LongWritable fileSize =
+          new LongWritable(Long.valueOf(entry.getValue().toString()));
+      BytesWritable filePath =
+          new BytesWritable(entry.getKey().toString().getBytes());
+
+      byteCount += fileSize.get();
+      bytesSync += fileSize.get();
+      if (bytesSync > AVG_BYTES_PER_MAP) {
+        src_writer.sync();
+        bytesSync = fileSize.get();
+      }
+      src_writer.append(fileSize, filePath);
+    }
+    if (src_writer != null) {
+      src_writer.close();
+    }
+    conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
+    conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
+    LOG.info("DistCacheFilesToBeGeneratedCount=" + fileCount
+        + ". TotalBytesToBeGeneratedInDistCacheFiles=" + byteCount);
+
+    if (!shouldGenerateDistCacheData() && fileCount > 0) {
+      LOG.error("Missing " + fileCount + " distributed cache files under the "
+          + " directory\n" + distCachePath + "\nthat are needed for gridmix"
+          + " to emulate distributed cache load. Either use -generate\noption"
+          + " to generate distributed cache data along with input data OR "
+          + "disable\ndistributed cache emulation by configuring '"
+          + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+          + "' to false.");
+      return MISSING_DIST_CACHE_FILES_ERROR;
+    }
+    return 0;
+  }
+
+  /**
+   * If gridmix needs to emulate dist cache load, then configure dist cache
+   * files of a simulated job by mapping the original cluster's dist cache
+   * file paths to the simulated cluster's paths and setting these mapped paths
+   * in the job configuration of the simulated job.
+   * @param conf configuration for the simulated job to be run
+   * @param jobConf job configuration of original cluster's job, obtained from
+   *                trace
+   * @throws IOException
+   */
+  void configureDistCacheFiles(Configuration conf, JobConf jobConf)
+      throws IOException {
+    if (shouldEmulateDistCacheLoad()) {
+
+      String[] files = jobConf.getStrings(MRJobConfig.CACHE_FILES);
+      if (files != null) {
+        String[] cacheFiles = new String[files.length];
+        String[] visibilities =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        String[] timeStamps =
+          jobConf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+
+        for (int i = 0; i < files.length; i++) {
+          String mappedPath = mapDistCacheFilePath(files[i], timeStamps[i],
+              Boolean.valueOf(visibilities[i]));
+          cacheFiles[i] = mappedPath;
+        }
+        conf.setStrings(MRJobConfig.CACHE_FILES, cacheFiles);
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Tue Mar  8 05:58:47 2011
@@ -85,10 +85,11 @@ class GenerateData extends GridmixJob {
    * Replication of generated data.
    */
   public static final String GRIDMIX_GEN_REPLICATION = "gridmix.gen.replicas";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_INPUT_DATA";
 
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
-    super(conf, 0L, "GRIDMIX_GENDATA");
+    super(conf, 0L, JOB_NAME);
     job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java?rev=1079234&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java Tue Mar  8 05:58:47 2011
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * GridmixJob that generates distributed cache files.
+ * {@link GenerateDistCacheData} expects a list of dist cache files to be
+ * generated as input. This list is expected to be stored as a sequence file
+ * and the filename is expected to be configured using
+ * {@code gridmix.distcache.file.list}.
+ * This input file contains the list of distributed cache files and their sizes.
+ * For each record (i.e. file size and file path) in this input file,
+ * a file with the specific file size at the specific path is created.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class GenerateDistCacheData extends GridmixJob {
+
+  /**
+   * Number of dist cache files to be created by gridmix
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_COUNT =
+      "gridmix.distcache.file.count";
+  /**
+   * Total number of bytes to be written to the dist cache files by gridmix.
+   * i.e. Sum of sizes of all unique dist cache files to be created by gridmix.
+   */
+  static final String GRIDMIX_DISTCACHE_BYTE_COUNT =
+      "gridmix.distcache.byte.count";
+  /**
+   * The special file created(and used) by gridmix, that contains the list of
+   * unique dist cache files that are to be created and their sizes.
+   */
+  static final String GRIDMIX_DISTCACHE_FILE_LIST =
+      "gridmix.distcache.file.list";
+  static final String JOB_NAME = "GRIDMIX_GENERATE_DISTCACHE_DATA";
+
+  public GenerateDistCacheData(Configuration conf) throws IOException {
+    super(conf, 0L, JOB_NAME);
+  }
+
+  @Override
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+    ugi.doAs( new PrivilegedExceptionAction <Job>() {
+       public Job run() throws IOException, ClassNotFoundException,
+                               InterruptedException {
+        job.setMapperClass(GenDCDataMapper.class);
+        job.setNumReduceTasks(0);
+        job.setMapOutputKeyClass(NullWritable.class);
+        job.setMapOutputValueClass(BytesWritable.class);
+        job.setInputFormatClass(GenDCDataFormat.class);
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setJarByClass(GenerateDistCacheData.class);
+        try {
+          FileInputFormat.addInputPath(job, new Path("ignored"));
+        } catch (IOException e) {
+          LOG.error("Error while adding input path ", e);
+        }
+        job.submit();
+        return job;
+      }
+    });
+    return job;
+  }
+
+  public static class GenDCDataMapper
+      extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
+
+    private BytesWritable val;
+    private final Random r = new Random();
+    private FileSystem fs;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      val = new BytesWritable(new byte[context.getConfiguration().getInt(
+              GenerateData.GRIDMIX_VAL_BYTES, 1024 * 1024)]);
+      fs = FileSystem.get(context.getConfiguration());
+    }
+
+    // Create one dist cache file with the needed file size.
+    // key is dist cache file size and value is dist cache file path.
+    @Override
+    public void map(LongWritable key, BytesWritable value, Context context)
+        throws IOException, InterruptedException {
+
+      String fileName = new String(value.getBytes(), 0, value.getLength());
+      Path path = new Path(fileName);
+
+      /**
+       * Create dist cache file with the permissions 0755.
+       * Since the private dist cache directory doesn't have execute permission
+       * for others, it is OK to set read permission for others for the files
+       * under that directory and still they will become 'private' dist cache
+       * files on the simulated cluster.
+       */
+      FSDataOutputStream dos =
+          FileSystem.create(fs, path, new FsPermission((short)0755));
+
+      for (long bytes = key.get(); bytes > 0; bytes -= val.getLength()) {
+        r.nextBytes(val.getBytes());
+        val.setSize((int)Math.min(val.getLength(), bytes));
+        dos.write(val.getBytes(), 0, val.getLength());// Write to distCache file
+      }
+      dos.close();
+    }
+  }
+
+  /**
+   * InputFormat for GenerateDistCacheData.
+   * Input to GenerateDistCacheData is the special file(in SequenceFile format)
+   * that contains the list of dist cache files to be generated along with their
+   * file sizes.
+   */
+  static class GenDCDataFormat
+      extends InputFormat<LongWritable, BytesWritable> {
+
+    // Split the special file that contains the list of dist cache file paths
+    // and their file sizes such that each split corresponds to approximately
+    // same amount of dist cache data to be generated.
+    // Consider numTaskTrackers * numMapSlotsPerTracker as the number of maps
+    // for this job, if there is lot of data to be generated.
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      final JobConf jobConf = new JobConf(jobCtxt.getConfiguration());
+      final JobClient client = new JobClient(jobConf);
+      ClusterStatus stat = client.getClusterStatus(true);
+      int numTrackers = stat.getTaskTrackers();
+      final int fileCount = jobConf.getInt(GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+
+      // Total size of dist cache files to be generated
+      final long totalSize = jobConf.getLong(GRIDMIX_DISTCACHE_BYTE_COUNT, -1);
+      // Get the path of the special file
+      String distCacheFileList = jobConf.get(GRIDMIX_DISTCACHE_FILE_LIST);
+      if (fileCount < 0 || totalSize < 0 || distCacheFileList == null) {
+        throw new RuntimeException("Invalid metadata: #files (" + fileCount
+            + "), total_size (" + totalSize + "), filelisturi ("
+            + distCacheFileList + ")");
+      }
+
+      Path sequenceFile = new Path(distCacheFileList);
+      FileSystem fs = sequenceFile.getFileSystem(jobConf);
+      FileStatus srcst = fs.getFileStatus(sequenceFile);
+      // Consider the number of TTs * mapSlotsPerTracker as number of mappers.
+      int numMapSlotsPerTracker = jobConf.getInt(TTConfig.TT_MAP_SLOTS, 2);
+      int numSplits = numTrackers * numMapSlotsPerTracker;
+
+      List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
+      LongWritable key = new LongWritable();
+      BytesWritable value = new BytesWritable();
+
+      // Average size of data to be generated by each map task
+      final long targetSize = Math.max(totalSize / numSplits,
+                                DistributedCacheEmulator.AVG_BYTES_PER_MAP);
+      long splitStartPosition = 0L;
+      long splitEndPosition = 0L;
+      long acc = 0L;
+      long bytesRemaining = srcst.getLen();
+      SequenceFile.Reader reader = null;
+      try {
+        reader = new SequenceFile.Reader(fs, sequenceFile, jobConf);
+        while (reader.next(key, value)) {
+
+          // If adding this file would put this split past the target size,
+          // cut the last split and put this file in the next split.
+          if (acc + key.get() > targetSize && acc != 0) {
+            long splitSize = splitEndPosition - splitStartPosition;
+            splits.add(new FileSplit(
+                sequenceFile, splitStartPosition, splitSize, (String[])null));
+            bytesRemaining -= splitSize;
+            splitStartPosition = splitEndPosition;
+            acc = 0L;
+          }
+          acc += key.get();
+          splitEndPosition = reader.getPosition();
+        }
+      } finally {
+        if (reader != null) {
+          reader.close();
+        }
+      }
+      if (bytesRemaining != 0) {
+        splits.add(new FileSplit(
+            sequenceFile, splitStartPosition, bytesRemaining, (String[])null));
+      }
+
+      return splits;
+    }
+
+    /**
+     * Returns a reader for this split of the dist cache file list.
+     */
+    @Override
+    public RecordReader<LongWritable, BytesWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException, InterruptedException {
+      return new SequenceFileRecordReader<LongWritable, BytesWritable>();
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Mar  8 05:58:47 2011
@@ -92,6 +92,8 @@ public class Gridmix extends Configured 
    */
   public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
 
+  private DistributedCacheEmulator distCacheEmulator;
+
   // Submit data structures
   private JobFactory factory;
   private JobSubmitter submitter;
@@ -102,36 +104,61 @@ public class Gridmix extends Configured 
   private final Shutdown sdh = new Shutdown();
 
   /**
-   * Write random bytes at the path provided.
+   * Write random bytes at the path &lt;ioPath&gt;/input/
    * @see org.apache.hadoop.mapred.gridmix.GenerateData
    */
   protected void writeInputData(long genbytes, Path ioPath)
       throws IOException, InterruptedException {
+    Path inputDir = new Path(ioPath, "input");
     final Configuration conf = getConf();
-    final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
-    submitter.add(genData);
+    final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
     LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
         " of test data...");
+    launchGridmixJob(genData);
+    
+    FsShell shell = new FsShell(conf);
+    try {
+      LOG.info("Changing the permissions for inputPath " + inputDir.toString());
+      shell.run(new String[] {"-chmod","-R","777", inputDir.toString()});
+    } catch (Exception e) {
+      LOG.error("Couldnt change the file permissions " , e);
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Write random bytes in the distributed cache files that will be used by all
+   * simulated jobs of current gridmix run, if files are to be generated.
+   * Do this as part of the MapReduce job GRIDMIX_GENERATE_DISTCACHE_DATA.
+   * @see org.apache.hadoop.mapred.gridmix.GenerateDistCacheData
+   */
+  protected void writeDistCacheData(Configuration conf)
+      throws IOException, InterruptedException {
+    int fileCount =
+        conf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1);
+    if (fileCount > 0) {// generate dist cache files
+      final GridmixJob genDistCacheData = new GenerateDistCacheData(conf);
+      LOG.info("Generating dist cache data of size " + conf.getLong(
+          GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+      launchGridmixJob(genDistCacheData);
+    }
+  }
+
+  // Launch Input/DistCache Data Generation job and wait for completion
+  void launchGridmixJob(GridmixJob job)
+      throws IOException, InterruptedException {
+    submitter.add(job);
+
     // TODO add listeners, use for job dependencies
     TimeUnit.SECONDS.sleep(10);
     try {
-      genData.getJob().waitForCompletion(false);
+      job.getJob().waitForCompletion(false);
     } catch (ClassNotFoundException e) {
       throw new IOException("Internal error", e);
     }
-    if (!genData.getJob().isSuccessful()) {
-      throw new IOException("Data generation failed!");
+    if (!job.getJob().isSuccessful()) {
+      throw new IOException(job.getJob().getJobName() + " job failed!");
     }
-
-    FsShell shell = new FsShell(conf);
-    try {
-      LOG.info("Changing the permissions for inputPath " + ioPath.toString());
-      shell.run(new String[] {"-chmod","-R","777", ioPath.toString()});
-    } catch (Exception e) {
-      LOG.error("Couldnt change the file permissions " , e);
-      throw new IOException(e);
-    }
-    LOG.info("Done.");
   }
 
   /**
@@ -158,7 +185,9 @@ public class Gridmix extends Configured 
    * @param conf Configuration data, no keys specific to this context
    * @param traceIn Either a Path to the trace data or &quot;-&quot; for
    *                stdin
-   * @param ioPath Path from which input data is read
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir from which input data is
+   *               read and &lt;ioPath&gt;/distributedCache/ is the gridmix
+   *               distributed cache directory.
    * @param scratchDir Path into which job output is written
    * @param startFlag Semaphore for starting job trace pipeline
    */
@@ -166,6 +195,7 @@ public class Gridmix extends Configured 
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
     try {
+      Path inputDir = new Path(ioPath, "input");
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
@@ -179,11 +209,14 @@ public class Gridmix extends Configured 
       int numThreads = conf.getInt(GRIDMIX_SUB_THR, noOfSubmitterThreads);
       int queueDep = conf.getInt(GRIDMIX_QUE_DEP, 5);
       submitter = createJobSubmitter(monitor, numThreads, queueDep,
-                                     new FilePool(conf, ioPath), userResolver, 
+                                     new FilePool(conf, inputDir), userResolver, 
                                      statistics);
-      
+      distCacheEmulator = new DistributedCacheEmulator(conf, ioPath);
+
       factory = createJobFactory(submitter, traceIn, scratchDir, conf, 
                                  startFlag, userResolver);
+      factory.jobCreator.setDistCacheEmulator(distCacheEmulator);
+
       if (policy == GridmixJobSubmissionPolicy.SERIAL) {
         statistics.addJobStatsListeners(factory);
       } else {
@@ -244,6 +277,9 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
+    
+    // Should gridmix generate distributed cache data ?
+    boolean generate = false;
     long genbytes = -1L;
     String traceIn = null;
     Path ioPath = null;
@@ -257,6 +293,7 @@ public class Gridmix extends Configured 
       for (int i = 0; i < argv.length - 2; ++i) {
         if ("-generate".equals(argv[i])) {
           genbytes = StringUtils.TraditionalBinaryPrefix.string2long(argv[++i]);
+          generate = true;
         } else if ("-users".equals(argv[i])) {
           userRsrc = new URI(argv[++i]);
         } else {
@@ -287,12 +324,33 @@ public class Gridmix extends Configured 
       printUsage(System.err);
       return 1;
     }
-    return start(conf, traceIn, ioPath, genbytes, userResolver);
+    return start(conf, traceIn, ioPath, genbytes, userResolver, generate);
   }
 
+  /**
+   * 
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath Working directory for gridmix. GenerateData job
+   *               will generate data in the directory &lt;ioPath&gt;/input/ and
+   *               distributed cache data is generated in the directory
+   *               &lt;ioPath&gt;/distributedCache/, if -generate option is
+   *               specified.
+   * @param genbytes size of input data to be generated under the directory
+   *                 &lt;ioPath&gt;/input/
+   * @param userResolver gridmix user resolver
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
   int start(Configuration conf, String traceIn, Path ioPath, long genbytes,
-      UserResolver userResolver) throws IOException, InterruptedException {
+      UserResolver userResolver, boolean generate)
+      throws IOException, InterruptedException {
     InputStream trace = null;
+    ioPath = ioPath.makeQualified(ioPath.getFileSystem(conf));
+
     try {
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
 
@@ -310,10 +368,12 @@ public class Gridmix extends Configured 
         // scan input dir contents
         submitter.refreshFilePool();
 
-        // create scratch directory(output path of gridmix)
-        final FileSystem scratchFs = scratchDir.getFileSystem(conf);
-        FileSystem.mkdirs(scratchFs, scratchDir,
-            new FsPermission((short) 0777));
+        // set up the needed things for emulation of various loads
+        int exitCode = setupEmulation(conf, traceIn, scratchDir, ioPath,
+                                      generate);
+        if (exitCode != 0) {
+          return exitCode;
+        }
 
         factory.start();
         statistics.start();
@@ -350,6 +410,64 @@ public class Gridmix extends Configured 
   }
 
   /**
+   * Create gridmix output directory. Setup things for emulation of
+   * various loads, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param scratchDir gridmix output directory
+   * @param ioPath Working directory for gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException 
+   */
+  private int setupEmulation(Configuration conf, String traceIn,
+      Path scratchDir, Path ioPath, boolean generate)
+      throws IOException, InterruptedException {
+    // create scratch directory(output directory of gridmix)
+    final FileSystem scratchFs = scratchDir.getFileSystem(conf);
+    FileSystem.mkdirs(scratchFs, scratchDir, new FsPermission((short) 0777));
+
+    // Setup things needed for emulation of dist cache load
+    return setupDistCacheEmulation(conf, traceIn, ioPath, generate);
+    // Setup emulation of other loads like CPU load, Memory load
+  }
+
+  /**
+   * Setup gridmix for emulation of dist cache load. This includes
+   * generation of dist cache files, if needed.
+   * @param conf gridmix configuration
+   * @param traceIn trace file path(if it is '-', then trace comes from the
+   *                stream stdin)
+   * @param ioPath &lt;ioPath&gt;/input/ is the dir where input data (a) exists
+   *               or (b) is generated. &lt;ioPath&gt;/distributedCache/ is the
+   *               folder where dist cache data (a) exists or (b) is to be
+   *               generated by gridmix.
+   * @param generate true if -generate option was specified
+   * @return exit code
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private int setupDistCacheEmulation(Configuration conf, String traceIn,
+      Path ioPath, boolean generate) throws IOException, InterruptedException {
+    distCacheEmulator.init(traceIn, factory.jobCreator, generate);
+    int exitCode = 0;
+    if (distCacheEmulator.shouldGenerateDistCacheData() ||
+        distCacheEmulator.shouldEmulateDistCacheLoad()) {
+
+      JobStoryProducer jsp = createJobStoryProducer(traceIn, conf);
+      exitCode = distCacheEmulator.setupGenerateDistCacheData(jsp);
+      if (exitCode == 0) {
+        // If there are files to be generated, run a MapReduce job to generate
+        // these dist cache files of all the simulated jobs of this trace.
+        writeDistCacheData(conf);
+      }
+    }
+    return exitCode;
+  }
+
+  /**
    * Handles orderly shutdown by requesting that each component in the
    * pipeline abort its progress, waiting for each to exit and killing
    * any jobs still running on the cluster.
@@ -446,6 +564,11 @@ public class Gridmix extends Configured 
     ToolRunner.printGenericCommandUsage(out);
     out.println("Usage: gridmix [-generate <MiB>] [-users URI] [-Dname=value ...] <iopath> <trace>");
     out.println("  e.g. gridmix -generate 100m foo -");
+    out.println("Options:");
+    out.println("   -generate <MiB> : Generate input data of size MiB under "
+        + "<iopath>/input/ and generate\n\t\t     distributed cache data under "
+        + "<iopath>/distributedCache/.");
+    out.println("   -users <usersResourceURI> : URI that contains the users list.");
     out.println("Configuration parameters:");
     out.println("   General parameters:");
     out.printf("       %-48s : Output directory\n", GRIDMIX_OUT_DIR);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Tue Mar  8 05:58:47 2011
@@ -18,31 +18,42 @@
 
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public enum JobCreator {
 
   LOADJOB {
     @Override
     public GridmixJob createGridmixJob(
-      Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot,
-      UserGroupInformation ugi, int seq) throws IOException {
+      Configuration gridmixConf, long submissionMillis, JobStory jobdesc,
+      Path outRoot, UserGroupInformation ugi, int seq) throws IOException {
+
+      // Build configuration for this simulated job
+      Configuration conf = new Configuration(gridmixConf);
+      dce.configureDistCacheFiles(conf, jobdesc.getJobConf());
       return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq);
-    }},
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return true;
+    }
+  },
 
   SLEEPJOB {
     private String[] hosts;
@@ -72,12 +83,30 @@ public enum JobCreator {
       }
       return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq,
           numLocations, hosts);
-    }};
+    }
+
+    @Override
+    public boolean canEmulateDistCacheLoad() {
+      return false;
+    }
+  };
 
   public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type";
   public static final String SLEEPJOB_RANDOM_LOCATIONS = 
     "gridmix.sleep.fake-locations";
 
+  /**
+   * Create Gridmix simulated job.
+   * @param conf configuration of simulated job
+   * @param submissionMillis At what time submission of this simulated job be
+   *                         done
+   * @param jobdesc JobStory obtained from trace
+   * @param outRoot gridmix output directory
+   * @param ugi UGI of job submitter of this simulated job
+   * @param seq job sequence number
+   * @return the created simulated job
+   * @throws IOException
+   */
   public abstract GridmixJob createGridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
     Path outRoot, UserGroupInformation ugi, final int seq) throws IOException;
@@ -86,4 +115,21 @@ public enum JobCreator {
     Configuration conf, JobCreator defaultPolicy) {
     return conf.getEnum(GRIDMIX_JOB_TYPE, defaultPolicy);
   }
+
+  /**
+   * @return true if gridmix simulated jobs of this job type can emulate
+   *         distributed cache load
+   */
+  abstract boolean canEmulateDistCacheLoad();
+
+  DistributedCacheEmulator dce;
+  /**
+   * This method is to be called before calling any other method in JobCreator
+   * except canEmulateDistCacheLoad(), especially if canEmulateDistCacheLoad()
+   * returns true for that job type.
+   * @param e Distributed Cache Emulator
+   */
+  void setDistCacheEmulator(DistributedCacheEmulator e) {
+    this.dce = e;
+  }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobProducer.java Tue Mar  8 05:58:47 2011
@@ -289,7 +289,7 @@ public class DebugJobProducer implements
 
     @Override
     public org.apache.hadoop.mapred.JobConf getJobConf() {
-      throw new UnsupportedOperationException();
+      return new JobConf(conf);
     }
 
     @Override

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java?rev=1079234&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestDistCacheEmulation.java Tue Mar  8 05:58:47 2011
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import static org.junit.Assert.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Validate emulation of distributed cache load in gridmix simulated jobs.
+ *
+ */
+public class TestDistCacheEmulation {
+
+  private static List<Long> privateDCFilesSizesExpected = new ArrayList<Long>();
+  private static List<Long> publicDCFilesSizesExpected = new ArrayList<Long>();
+
+  private DistributedCacheEmulator dce = null;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    GridmixTestUtils.initCluster();
+  }
+
+  @AfterClass
+  public static void shutDown() throws IOException {
+    GridmixTestUtils.shutdownCluster();
+  }
+
+  /**
+   * Validate the dist cache files generated by GenerateDistCacheData job.
+   * @param jobConf configuration of GenerateDistCacheData job.
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheData(JobConf jobConf)
+      throws FileNotFoundException, IOException {
+    Path privateDistCache = dce.getPrivateDistCacheDir();
+    Path publicDistCache = dce.getPublicDistCacheDir();
+    Path distCachePath = privateDistCache.getParent();
+    String filesListFile =
+        jobConf.get(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST);
+
+    // validate files directly under dist cache dir
+    RemoteIterator<LocatedFileStatus> iter =
+        GridmixTestUtils.dfs.listFiles(distCachePath, false);
+    assertTrue("Dist Cache files list file missing.", iter.hasNext());
+    LocatedFileStatus stat = iter.next();
+    assertTrue("Dist Cache files list file path did not match the expected.",
+        stat.getPath().toUri().getPath().equals(
+        new Path(filesListFile).toUri().getPath()));
+    assertFalse("Some file other than special file is seen directly under "
+        + "dist cache directory.", iter.hasNext());
+
+    // validate private dist cache files
+    validateDistCacheFiles(privateDCFilesSizesExpected,
+        privateDistCache, false);
+
+    // validate public dist cache files
+    validateDistCacheFiles(publicDCFilesSizesExpected,
+        publicDistCache, true);
+  }
+
+  /**
+   * Validate private/public dist cache files.
+   * @param filesSizesExpected sizes of expected dist cache files
+   * @param distCacheDir the private/public dist cache dir to be validated
+   * @param arePublic true if the dist cache files to be validated are public
+   * @throws IOException 
+   * @throws FileNotFoundException 
+   */
+  private void validateDistCacheFiles(List<Long> filesSizesExpected,
+      Path distCacheDir, boolean arePublic)
+      throws FileNotFoundException, IOException {
+    String publicOrPrivate = arePublic ? "public" : "private";
+    RemoteIterator<LocatedFileStatus> iter =
+        GridmixTestUtils.dfs.listFiles(distCacheDir, false);
+    int numFiles = filesSizesExpected.size();
+    for (int i = 0; i < numFiles; i++) {
+      assertTrue("Missing " + publicOrPrivate + " dist cache files",
+                 iter.hasNext());
+      LocatedFileStatus stat = iter.next();
+      assertTrue("File size of " + publicOrPrivate + " dist cache file "
+          + stat.getPath().toUri().getPath() + " is wrong.",
+          filesSizesExpected.remove(stat.getLen()));
+
+      FsPermission perm = stat.getPermission();
+      assertEquals("Wrong permissions for " + publicOrPrivate
+          + " dist cache file " + stat.getPath().toUri().getPath(),
+          new FsPermission((short)0644), perm);
+    }
+    assertFalse("Number of files under " + publicOrPrivate
+        + " dist cache dir is wrong.", iter.hasNext());
+  }
+
+  /**
+   * Runs setupGenerateDistCacheData() on a new DistrbutedCacheEmulator and
+   * and returns the jobConf. Fills the arrays sortedFileSizes and
+   * sortedVisibilities that can be used for validation.
+   * Validation of exit code from setupGenerateDistCacheData() is done.
+   * @param generate true if -generate option is specified
+   * @param sortedFileSizes sorted dist cache file sizes
+   * @param sortedVisibilities visibilities of dist cache files sorted based on
+   *                           file sizes
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private JobConf runSetupGenerateDistCacheData(boolean generate,
+      long[] sortedFileSizes, boolean[] sortedVisibilities)
+      throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    // Set some dummy dist cache files in gridmix configuration so that they go
+    // into the configuration of JobStory objects.
+    String[] distCacheFiles = {"hdfs:///tmp/file1.txt",
+                               "/tmp/" + user + "/.staging/job_1/file2.txt",
+                               "hdfs:///user/user1/file3.txt",
+                               "/home/user2/file4.txt",
+                               "subdir1/file5.txt",
+                               "subdir2/file6.gz"};
+    String[] fileSizes = {"400", "2500", "700", "1200", "1500", "500"};
+    System.arraycopy(new long[] {2500, 1500, 1200, 700, 500, 400}, 0,
+                     sortedFileSizes, 0, 6);
+    String[] visibilities = {"true", "false", "false", "true", "true", "false"};
+    System.arraycopy(new boolean[] {//sorted based on file sizes
+                     false, true, true, false, false, true}, 0,
+                     sortedVisibilities, 0, 6);
+    String[] timeStamps = {"1234", "2345", "34567", "5434", "125", "134"};
+    conf.setStrings(MRJobConfig.CACHE_FILES, distCacheFiles);
+    conf.setStrings(MRJobConfig.CACHE_FILES_SIZES, fileSizes);
+    conf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, visibilities);
+    conf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, timeStamps);
+    // build lists of expected dist cache files
+    privateDCFilesSizesExpected.clear();
+    privateDCFilesSizesExpected.add(2500L);
+    privateDCFilesSizesExpected.add(700L);
+    privateDCFilesSizesExpected.add(500L);
+    publicDCFilesSizesExpected.clear();
+    publicDCFilesSizesExpected.add(1500L);
+    publicDCFilesSizesExpected.add(1200L);
+    publicDCFilesSizesExpected.add(400L);
+
+    // Job stories of all 3 jobs will have same dist cache files in their
+    // configurations
+    final int numJobs = 3;
+    DebugJobProducer jobProducer = new DebugJobProducer(numJobs, conf);
+
+    JobConf jobConf =
+        GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
+    Path ioPath = new Path("testSetupGenerateDistCacheData")
+                    .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    if (fs.exists(ioPath)) {
+      fs.delete(ioPath, true);
+    }
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    dce = createDistributedCacheEmulator(jobConf, ioPath, generate);
+    int exitCode = dce.setupGenerateDistCacheData(jobProducer);
+    int expectedExitCode = generate ? 0 : dce.MISSING_DIST_CACHE_FILES_ERROR;
+    assertEquals("setupGenerateDistCacheData failed.",
+                 expectedExitCode, exitCode);
+
+    // reset back
+    jobConf.setStrings(MRJobConfig.CACHE_FILES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILES_SIZES, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS, "");
+    jobConf.setStrings(MRJobConfig.CACHE_FILE_VISIBILITIES, "");
+    return jobConf;
+  }
+
+  /**
+   * Validate GenerateDistCacheData job if it creates dist cache files properly.
+   * @throws Exception
+   */
+  @Test
+  public void testGenerateDistCacheData() throws Exception {
+    JobConf jobConf =
+        runSetupGenerateDistCacheData(true, new long[6], new boolean[6]);
+    GridmixJob gridmixJob = new GenerateDistCacheData(jobConf);
+    Job job = gridmixJob.call();
+    assertEquals("Number of reduce tasks in GenerateDistCacheData is not 0.",
+        0, job.getNumReduceTasks());
+    assertTrue("GenerateDistCacheData job failed.",
+        job.waitForCompletion(false));
+    validateDistCacheData(jobConf);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directories and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void validateSetupGenDC(JobConf jobConf, long[] sortedFileSizes,
+      boolean[] sortedVisibilities)
+      throws IOException, InterruptedException {
+    // build things needed for validation
+    long sumOfFileSizes = 0;
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      sumOfFileSizes += sortedFileSizes[i];
+    }
+
+    FileSystem fs = FileSystem.get(jobConf);
+    assertEquals("Number of distributed cache files to be generated is wrong.",
+        sortedFileSizes.length,
+        jobConf.getInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, -1));
+    assertEquals("Total size of dist cache files to be generated is wrong.",
+        sumOfFileSizes, jobConf.getLong(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, -1));
+    Path filesListFile = new Path(jobConf.get(
+        GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_LIST));
+    FileStatus stat = fs.getFileStatus(filesListFile);
+    assertEquals("Wrong permissions of dist Cache files list file "
+        + filesListFile, new FsPermission((short)0644), stat.getPermission());
+
+    InputSplit split =
+        new FileSplit(filesListFile, 0, stat.getLen(), (String[])null);
+    TaskAttemptContext taskContext =
+        MapReduceTestUtil.createDummyMapTaskAttemptContext(jobConf);
+    RecordReader<LongWritable, BytesWritable> reader =
+      new GenerateDistCacheData.GenDCDataFormat().createRecordReader(
+      split, taskContext);
+    MapContext<LongWritable, BytesWritable, NullWritable, BytesWritable>
+        mapContext = new MapContextImpl<LongWritable, BytesWritable,
+        NullWritable, BytesWritable>(jobConf, taskContext.getTaskAttemptID(),
+        reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mapContext);
+
+    // start validating setupGenerateDistCacheData
+    doValidateSetupGenDC(reader, fs, sortedFileSizes, sortedVisibilities);
+  }
+
+  /**
+   *  Validate setupGenerateDistCacheData by validating
+   *  <li> permissions of the distributed cache directories and
+   *  <li> content of the generated sequence file. This includes validation of
+   *       dist cache file paths and their file sizes.
+   */
+  private void doValidateSetupGenDC(RecordReader<LongWritable, BytesWritable>
+      reader, FileSystem fs, long[] sortedFileSizes,
+      boolean[] sortedVisibilities) throws IOException, InterruptedException {
+
+    // Validate permissions of dist cache directories
+    Path privateDCDirPath = dce.getPrivateDistCacheDir();
+    assertEquals("Wrong permissions for private dist cache dir.",
+        fs.getFileStatus(privateDCDirPath).getPermission()
+        .getOtherAction().and(FsAction.EXECUTE), FsAction.NONE);
+    Path publicDCDirPath = dce.getPublicDistCacheDir();
+    assertEquals("Wrong permissions for public dist cache dir.",
+        fs.getFileStatus(publicDCDirPath).getPermission()
+        .getOtherAction().and(FsAction.EXECUTE), FsAction.EXECUTE);
+
+    // Validate the content of the sequence file generated by
+    // dce.setupGenerateDistCacheData().
+    LongWritable key = new LongWritable();
+    BytesWritable val = new BytesWritable();
+    for (int i = 0; i < sortedFileSizes.length; i++) {
+      assertTrue("Number of files written to the sequence file by "
+          + "setupGenerateDistCacheData is less than the expected.",
+          reader.nextKeyValue());
+      key = reader.getCurrentKey();
+      val = reader.getCurrentValue();
+      long fileSize = key.get();
+      String file = new String(val.getBytes(), 0, val.getLength());
+
+      // Dist Cache files should be sorted based on file size.
+      assertEquals("Dist cache file size is wrong.",
+          sortedFileSizes[i], fileSize);
+
+      // Validate dist cache file paths based on visibilities
+
+      // parent dir of dist cache file
+      Path parent = new Path(file).getParent().makeQualified(fs);
+      if (sortedVisibilities[i]) {// should exist in public dist cache dir
+        assertTrue("Public dist cache file path is wrong.",
+            publicDCDirPath.equals(parent));
+      } else {// should exist in private dist cache dir
+        assertTrue("Private dist cache file path is wrong.",
+            privateDCDirPath.equals(parent));
+      }
+    }
+  }
+
+  /**
+   *  Test if DistributedCacheEmulator's setup of GenerateDistCacheData is
+   *  working as expected.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testSetupGenerateDistCacheData()
+      throws IOException, InterruptedException {
+    long[] sortedFileSizes = new long[6];
+    boolean[] sortedVisibilities = new boolean[6];
+    JobConf jobConf =
+      runSetupGenerateDistCacheData(true, sortedFileSizes, sortedVisibilities);
+    validateSetupGenDC(jobConf, sortedFileSizes, sortedVisibilities);
+    
+    // Verify if correct exit code is seen when -generate option is missing and
+    // distributed cache files are missing in the expected path.
+    runSetupGenerateDistCacheData(false, sortedFileSizes, sortedVisibilities);
+  }
+
+  /**
+   *  Create DistributedCacheEmulator object and do the initialization by
+   *  calling init() on it with dummy trace.
+   */
+  private DistributedCacheEmulator createDistributedCacheEmulator(
+      Configuration conf, Path ioPath, boolean generate) throws IOException {
+    DistributedCacheEmulator dce =
+        new DistributedCacheEmulator(conf, ioPath);
+    JobCreator jobCreator = JobCreator.getPolicy(conf, JobCreator.LOADJOB);
+    jobCreator.setDistCacheEmulator(dce);
+    dce.init("dummytrace", jobCreator, generate);
+    return dce;
+  }
+
+  /**
+   *  Test the configuration property for disabling/enabling emulation of
+   *  distributed cache load.
+   */
+  @Test
+  public void testDistCacheEmulationConfigurability() throws IOException {
+    Configuration conf = new Configuration();
+    JobConf jobConf = GridmixTestUtils.mrCluster.createJobConf(
+        new JobConf(conf));
+    Path ioPath = new Path("testDistCacheEmulationConfigurability")
+        .makeQualified(GridmixTestUtils.dfs);
+    FileSystem fs = FileSystem.get(jobConf);
+    FileSystem.mkdirs(fs, ioPath, new FsPermission((short)0777));
+
+    // default config
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertTrue("Default configuration of "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " is wrong.", dce.shouldEmulateDistCacheLoad());
+
+    // config property set to false
+    jobConf.setBoolean(
+        DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE, false);
+    dce = createDistributedCacheEmulator(jobConf, ioPath, false);
+    assertFalse("Disabling of emulation of distributed cache load by setting "
+        + DistributedCacheEmulator.GRIDMIX_EMULATE_DISTRIBUTEDCACHE
+        + " to false is not working.", dce.shouldEmulateDistCacheLoad());
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Mar  8 05:58:47 2011
@@ -111,7 +111,7 @@ public class TestGridmixSubmission {
         GridmixTestUtils.mrCluster.createJobConf());
       for (Job job : succeeded) {
         final String jobname = job.getJobName();
-        if ("GRIDMIX_GENDATA".equals(jobname)) {
+        if (GenerateData.JOB_NAME.equals(jobname)) {
           if (!job.getConfiguration().getBoolean(
             GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE, true)) {
             assertEquals(" Improper queue for " + job.getJobName(),

Modified: hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml?rev=1079234&r1=1079233&r2=1079234&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml Tue Mar  8 05:58:47 2011
@@ -78,17 +78,28 @@ org.apache.hadoop.mapred.gridmix.Gridmix
 	<code>-Dgridmix.output.directory=foo</code> as given above should
 	be used <em>before</em> other GridMix parameters.
       </note>
-      <p>The <code>-generate</code> option is used to generate input data
-      for the synthetic jobs. It accepts size suffixes, e.g.
-      <code>100g</code> will generate 100 * 2<sup>30</sup> bytes.</p>
-      <p>The <code>-users</code> option is used to point to a users-list
-      file (see <a href="#usersqueues">Emulating Users and Queues</a>).</p>
-      <p>The <code>&lt;iopath&gt;</code> parameter is the destination
-      directory for generated output and/or the directory from which input
-      data will be read. Note that this can either be on the local file-system
+      <p>The <code>&lt;iopath&gt;</code> parameter is the working directory for
+      GridMix. Note that this can either be on the local file-system
       or on HDFS, but it is highly recommended that it be the same as that for
       the original job mix so that GridMix puts the same load on the local
       file-system and HDFS respectively.</p>
+      <p>The <code>-generate</code> option is used to generate input data and
+      Distributed Cache files for the synthetic jobs. It accepts standard units
+      of size suffixes, e.g. <code>100g</code> will generate
+      100 * 2<sup>30</sup> bytes as input data.
+      <code>&lt;iopath&gt;/input</code> is the destination directory for
+      generated input data and/or the directory from which input data will be
+      read. HDFS-based Distributed Cache files are generated under the directory
+      <code>&lt;iopath&gt;/distributedCache</code>.
+      Private Distributed Cache files are created under the directory
+      <code>&lt;iopath&gt;/distributedCache/private</code> and public
+      Distributed Cache files are created under the directoty
+      <code>&lt;iopath&gt;/distributedCache/public</code>. If some of the needed
+      Distributed Cache files are already existing in the above directories,
+      then only the remaining non-existing Distributed Cache files are generated
+      when <code>-generate</code> option is specified.</p>
+      <p>The <code>-users</code> option is used to point to a users-list
+      file (see <a href="#usersqueues">Emulating Users and Queues</a>).</p>
       <p>The <code>&lt;trace&gt;</code> parameter is a path to a job trace
       generated by Rumen. This trace can be compressed (it must be readable
       using one of the compression codecs supported by the cluster) or
@@ -505,6 +516,29 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
       tuser5 -&gt; user2
       </source>
     </section>
+
+  <section id="distributedcacheload">
+  <title>Emulation of Distributed Cache Load</title>
+    <p>Gridmix emulates Distributed Cache load by default for LOADJOB type of
+    jobs. This is done by precreating the needed Distributed Cache files for all
+    the simulated jobs as part of a separate MapReduce job.</p>
+    <p>Emulation of Distributed Cache load in gridmix simulated jobs can be
+    disabled by configuring the property
+    <code>gridmix.distributed-cache-emulation.enable</code> to
+    <code>false</code>.
+    But generation of Distributed Cache data by gridmix is driven by
+    <code>-generate</code> option and is independent of this configuration
+    property.</p>
+    <p>Both generation of Distributed Cache files and emulation of
+    Distributed Cache load are disabled if:</p>
+    <ul>
+    <li>input trace comes from the standard input-stream instead of file, or</li>
+    <li><code>&lt;iopath&gt;</code> specified is on local file-system, or</li>
+    <li>any of the ascendant directories of <code>&lt;iopath&gt;</code> till
+    root doesn't have execute permission for others.</li>
+    </ul>
+  </section>
+
     <section id="assumptions">
       <title>Simplifying Assumptions</title>
       <p>GridMix will be developed in stages, incorporating feedback and



Mime
View raw message