hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079236 - in /hadoop/mapreduce/branches/yahoo-merge/src: contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ docs/src/documentation/content/xdocs/
Date Tue, 08 Mar 2011 05:59:00 GMT
Author: omalley
Date: Tue Mar  8 05:58:59 2011
New Revision: 1079236

URL: http://svn.apache.org/viewvc?rev=1079236&view=rev
Log:
commit 74df6a868ffdef9e5d7abef9da86b8651f26039b
Author: Amar Ramesh Kamat <amarrk@yahoo-inc.com>
Date:   Tue Jan 4 15:50:53 2011 +0530

    : Adding compression emulation support in GridMix.
    
    +++ b/YAHOO-CHANGES.txt
    +  : Adding compression emulation support in GridMix. Patch is
    +  available at  (amarrk)
    +

Added:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
    hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java?rev=1079236&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java Tue Mar  8 05:58:59 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a utility class for all the compression related modules.
+ */
+class CompressionEmulationUtil {
+  static final Log LOG = LogFactory.getLog(CompressionEmulationUtil.class);
+  
+  /**
+   * Enable compression usage in GridMix runs.
+   */
+  private static final String COMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.enable";
+  
+  /**
+   * Enable input data decompression.
+   */
+  private static final String INPUT_DECOMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.input-decompression.enable";
+  
+  /**
+   * This is a {@link Mapper} implementation for generating random text data.
+   * It uses {@link RandomTextDataGenerator} for generating text data and the
+   * output files are compressed.
+   */
+  public static class RandomTextDataMapper
+  extends Mapper<NullWritable, LongWritable, Text, Text> {
+    private RandomTextDataGenerator rtg;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      int size = 
+        conf.getInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE,
+                    100);
+      int wordSize = 
+        conf.getInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
+                    10);
+      rtg = new RandomTextDataGenerator(size, null, wordSize);
+    }
+    
+    /**
+     * Emits random words sequence of desired size. Note that the desired output
+     * size is passed as the value parameter to this map.
+     */
+    @Override
+    public void map(NullWritable key, LongWritable value, Context context)
+    throws IOException, InterruptedException {
+      //TODO Control the extra data written ..
+      //TODO Should the key\tvalue\n be considered for measuring size?
+      //     Can counters like BYTES_WRITTEN be used? What will be the value of
+      //     such counters in LocalJobRunner?
+      for (long bytes = value.get(); bytes > 0;) {
+        String randomKey = rtg.getRandomWord();
+        String randomValue = rtg.getRandomWord();
+        context.write(new Text(randomKey), new Text(randomValue));
+        bytes -= (randomValue.getBytes().length + randomKey.getBytes().length);
+      }
+    }
+  }
+  
+  /**
+   * Configure the {@link Job} for enabling compression emulation.
+   */
+  static void configure(final Job job) throws IOException, InterruptedException,
+                                              ClassNotFoundException {
+    LOG.info("Gridmix is configured to use compressed data.");
+    // set the random text mapper
+    job.setMapperClass(RandomTextDataMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setInputFormatClass(GenDataFormat.class);
+    job.setJarByClass(GenerateData.class);
+
+    // set the output compression true
+    FileOutputFormat.setCompressOutput(job, true);
+    try {
+      FileInputFormat.addInputPath(job, new Path("ignored"));
+    } catch (IOException e) {
+      LOG.error("Error while adding input path ", e);
+    }
+  }
+  
+  /**
+   * Enables/Disables compression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #COMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setCompressionEmulationEnabled(Configuration conf, boolean val) {
+    conf.setBoolean(COMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Checks if compression emulation is enabled or not. Default is {@code true}.
+   */
+  static boolean isCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(COMPRESSION_EMULATION_ENABLE, true);
+  }
+  
+  /**
+   * Enables/Disables input decompression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #INPUT_DECOMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setInputCompressionEmulationEnabled(Configuration conf, 
+                                                  boolean val) {
+    conf.setBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Check if input decompression emulation is enabled or not. 
+   * Default is {@code false}.
+   */
+  static boolean isInputCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, false);
+  }
+  
+  /**
+   * Returns a {@link InputStream} for a file that might be compressed.
+   */
+  static InputStream getPossiblyDecompressedInputStream(Path file, 
+                                                        Configuration conf,
+                                                        long offset)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    if (isCompressionEmulationEnabled(conf)
+        && isInputCompressionEmulationEnabled(conf)) {
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(file);
+      Decompressor decompressor = CodecPool.getDecompressor(codec);
+      if (codec != null) {
+        CompressionInputStream in = 
+          codec.createInputStream(fs.open(file), decompressor);
+        //TODO Seek doesnt work with compressed input stream. 
+        //     Use SplittableCompressionCodec?
+        return (InputStream)in;
+      }
+    }
+    FSDataInputStream in = fs.open(file);
+    in.seek(offset);
+    return (InputStream)in;
+  }
+  
+  /**
+   * Returns a {@link OutputStream} for a file that might need 
+   * compression.
+   */
+  static OutputStream getPossiblyCompressedOutputStream(Path file, 
+                                                        Configuration conf)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    JobConf jConf = new JobConf(conf);
+    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
+      // get the codec class
+      Class<? extends CompressionCodec> codecClass =
+        org.apache.hadoop.mapred.FileOutputFormat
+                                .getOutputCompressorClass(jConf, 
+                                                          GzipCodec.class);
+      // get the codec implementation
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      // add the appropriate extension
+      file = file.suffix(codec.getDefaultExtension());
+
+      if (isCompressionEmulationEnabled(conf)) {
+        FSDataOutputStream fileOut = fs.create(file, false);
+        return new DataOutputStream(codec.createOutputStream(fileOut));
+      }
+    }
+    return fs.create(file, false);
+  }
+  
+  /**
+   * Extracts compression/decompression related configuration parameters from 
+   * the source configuration to the target configuration.
+   */
+  static void configureCompressionEmulation(Configuration source, 
+                                            Configuration target) {
+    // enable output compression
+    target.setBoolean(FileOutputFormat.COMPRESS, 
+        source.getBoolean(FileOutputFormat.COMPRESS, false));
+
+    // set the job output compression codec
+    String jobOutputCompressionCodec = 
+      source.get(FileOutputFormat.COMPRESS_CODEC);
+    if (jobOutputCompressionCodec != null) {
+      target.set(FileOutputFormat.COMPRESS_CODEC, jobOutputCompressionCodec);
+    }
+
+    // set the job output compression type
+    String jobOutputCompressionType = 
+      source.get(FileOutputFormat.COMPRESS_TYPE);
+    if (jobOutputCompressionType != null) {
+      target.set(FileOutputFormat.COMPRESS_TYPE, jobOutputCompressionType);
+    }
+
+    // enable map output compression
+    target.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS,
+        source.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+
+    // set the map output compression codecs
+    String mapOutputCompressionCodec = 
+      source.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC);
+    if (mapOutputCompressionCodec != null) {
+      target.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, 
+                 mapOutputCompressionCodec);
+    }
+
+    // enable input decompression
+    //TODO replace with mapInputBytes and hdfsBytesRead
+    Path[] inputs = 
+      org.apache.hadoop.mapred.FileInputFormat
+         .getInputPaths(new JobConf(source));
+    boolean needsCompressedInput = false;
+    CompressionCodecFactory compressionCodecs = 
+      new CompressionCodecFactory(source);
+    for (Path input : inputs) {
+      CompressionCodec codec = compressionCodecs.getCodec(input);
+      if (codec != null) {
+        needsCompressedInput = true;
+      }
+    }
+    setInputCompressionEmulationEnabled(target, needsCompressedInput);
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java Tue Mar  8 05:58:59 2011
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
@@ -35,7 +33,7 @@ class FileQueue extends InputStream {
 
   private int idx = -1;
   private long curlen = -1L;
-  private FSDataInputStream input;
+  private InputStream input;
   private final byte[] z = new byte[1];
   private final Path[] paths;
   private final long[] lengths;
@@ -65,9 +63,9 @@ class FileQueue extends InputStream {
     idx = (idx + 1) % paths.length;
     curlen = lengths[idx];
     final Path file = paths[idx];
-    final FileSystem fs = file.getFileSystem(conf);
-    input = fs.open(file);
-    input.seek(startoffset[idx]);
+    input = 
+      CompressionEmulationUtil.getPossiblyDecompressedInputStream(file, 
+                                 conf, startoffset[idx]);
   }
 
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Tue Mar  8 05:58:59 2011
@@ -101,6 +101,18 @@ class GenerateData extends GridmixJob {
     ugi.doAs( new PrivilegedExceptionAction <Job>() {
        public Job run() throws IOException, ClassNotFoundException,
                                InterruptedException {
+         // check if compression emulation is enabled
+         if (CompressionEmulationUtil
+             .isCompressionEmulationEnabled(job.getConfiguration())) {
+           CompressionEmulationUtil.configure(job);
+         } else {
+           configureRandomBytesDataGenerator();
+         }
+         job.submit();
+         return job;
+       }
+       
+       private void configureRandomBytesDataGenerator() {
         job.setMapperClass(GenDataMapper.class);
         job.setNumReduceTasks(0);
         job.setMapOutputKeyClass(NullWritable.class);
@@ -113,12 +125,15 @@ class GenerateData extends GridmixJob {
         } catch (IOException e) {
           LOG.error("Error while adding input path ", e);
         }
-        job.submit();
-        return job;
       }
     });
     return job;
   }
+  
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
 
   public static class GenDataMapper
       extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java Tue Mar  8 05:58:59 2011
@@ -115,6 +115,11 @@ class GenerateDistCacheData extends Grid
     return job;
   }
 
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
+
   public static class GenDCDataMapper
       extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Tue Mar  8 05:58:59 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Formatter;
 import java.util.List;
@@ -27,8 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
@@ -113,6 +112,16 @@ abstract class GridmixJob implements Cal
             setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
           }
 
+          // check if the job can emulate compression
+          if (canEmulateCompression()) {
+            // set the compression related configs if compression emulation is
+            // enabled
+            if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+              CompressionEmulationUtil.configureCompressionEmulation(
+                  jobdesc.getJobConf(), ret.getConfiguration());
+            }
+          }
+          
           return ret;
         }
       });
@@ -125,6 +134,11 @@ abstract class GridmixJob implements Cal
     outdir = new Path(outRoot, "" + seq);
   }
 
+  /**
+   * Indicates whether this {@link GridmixJob} supports compression emulation.
+   */
+  protected abstract boolean canEmulateCompression();
+  
   protected GridmixJob(final Configuration conf, long submissionMillis, 
                        final String name) throws IOException {
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
@@ -289,8 +303,12 @@ abstract class GridmixJob implements Cal
         TaskAttemptContext job) throws IOException {
 
       Path file = getDefaultWorkFile(job, "");
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
-      final FSDataOutputStream fileOut = fs.create(file, false);
+      final DataOutputStream fileOut;
+
+      fileOut = 
+        new DataOutputStream(CompressionEmulationUtil
+            .getPossiblyCompressedOutputStream(file, job.getConfiguration()));
+
       return new RecordWriter<K,GridmixRecord>() {
         @Override
         public void write(K ignored, GridmixRecord value)

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java Tue Mar  8 05:58:59 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 class GridmixRecord implements WritableComparable<GridmixRecord> {
 
@@ -39,6 +40,7 @@ class GridmixRecord implements WritableC
   private final DataOutputBuffer dob =
     new DataOutputBuffer(Long.SIZE / Byte.SIZE);
   private byte[] literal = dob.getData();
+  private boolean compressible = false;
 
   GridmixRecord() {
     this(1, 0L);
@@ -57,6 +59,10 @@ class GridmixRecord implements WritableC
     setSizeInternal(size);
   }
 
+  void setCompressibility(boolean compressible) {
+    this.compressible = compressible;
+  }
+  
   private void setSizeInternal(int size) {
     this.size = Math.max(1, size);
     try {
@@ -79,6 +85,37 @@ class GridmixRecord implements WritableC
     return (x ^= (x << 17));
   }
 
+  /**
+   * Generate random text data that can be compressed. If the record is marked
+   * compressible (via {@link FileOutputFormat#COMPRESS}), only then the 
+   * random data will be text data else 
+   * {@link GridmixRecord#writeRandom(DataOutput, int)} will be invoked.
+   */
+  private void writeRandomText(DataOutput out, final int size) 
+  throws IOException {
+    long tmp = seed;
+    out.writeLong(tmp);
+    int i = size - (Long.SIZE / Byte.SIZE);
+    RandomTextDataGenerator rtg = 
+      new RandomTextDataGenerator(100, seed, 10);
+    String randomWord = rtg.getRandomWord();
+    long randomWordSize = randomWord.getBytes().length;
+    while (i >= randomWordSize) {
+      WritableUtils.writeString(out, randomWord);
+      i -= randomWordSize;
+      
+      // get the next random word
+      randomWord = rtg.getRandomWord();
+      // determine the random word size
+      randomWordSize = randomWord.getBytes().length;
+    }
+    
+    // pad the remaining bytes
+    if (i > 0) {
+      out.write(randomWord.getBytes(), 0, i);
+    }
+  }
+  
   public void writeRandom(DataOutput out, final int size) throws IOException {
     long tmp = seed;
     out.writeLong(tmp);
@@ -120,7 +157,11 @@ class GridmixRecord implements WritableC
     WritableUtils.writeVInt(out, size);
     final int payload = size - WritableUtils.getVIntSize(size);
     if (payload > Long.SIZE / Byte.SIZE) {
-      writeRandom(out, payload);
+      if (compressible) {
+        writeRandomText(out, payload);
+      } else {
+        writeRandom(out, payload);
+      }
     } else if (payload > 0) {
       out.write(literal, 0, payload);
     }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java Tue Mar  8 05:58:59 2011
@@ -25,9 +25,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +46,7 @@ class InputStriper {
   long currentStart;
   FileStatus current;
   final List<FileStatus> files = new ArrayList<FileStatus>();
+  final Configuration conf = new Configuration();
 
   /**
    * @param inputDir Pool from which files are requested.
@@ -92,7 +96,15 @@ class InputStriper {
       }
       currentStart += fromFile;
       bytes -= fromFile;
-      if (current.getLen() - currentStart == 0) {
+      // Switch to a new file if
+      //  - the current file is uncompressed and completely used
+      //  - the current file is compressed
+      
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(current.getPath());
+      if (current.getLen() - currentStart == 0
+          || codec != null) {
         current = files.get(++idx % files.size());
         currentStart = 0;
       }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java Tue Mar  8 05:58:59 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -83,6 +84,11 @@ class LoadJob extends GridmixJob {
     return job;
   }
 
+  @Override
+  protected boolean canEmulateCompression() {
+    return true;
+  }
+  
   public static class LoadMapper
   extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
 
@@ -136,6 +142,12 @@ class LoadJob extends GridmixJob {
         : splitRecords;
       ratio = totalRecords / (1.0 * inputRecords);
       acc = 0.0;
+
+      // enable gridmix map output record for compression
+      if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false)) {
+        val.setCompressibility(true);
+      }
     }
 
     @Override
@@ -203,6 +215,13 @@ class LoadJob extends GridmixJob {
         new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
+      
+      // enable gridmix reduce output record for compression
+      Configuration conf = context.getConfiguration();
+      if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && FileOutputFormat.getCompressOutput(context)) {
+        val.setCompressibility(true);
+      }
     }
     @Override
     protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java?rev=1079236&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java Tue Mar  8 05:58:59 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+
+/**
+ * A random text generator. The words are simply sequences of alphabets.
+ */
+class RandomTextDataGenerator {
+  /**
+   * Random words list size.
+   */
+  static final String GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE = 
+    "gridmix.datagenerator.randomtext.listsize";
+  
+  /**
+   * Random words size.
+   */
+  static final String GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE = 
+    "gridmix.datagenerator.randomtext.wordsize";
+  
+  /**
+   * A list of random words
+   */
+  private String[] words;
+  private Random random;
+  
+  /**
+   * Constructor for {@link RandomTextDataGenerator}.
+   * @param size the total number of words to consider.
+   * @param seed Random number generator seed for repeatability
+   * @param wordSize Size of each word
+   */
+  RandomTextDataGenerator(int size, Long seed, int wordSize) {
+    if (seed == null) {
+      random = new Random();
+    } else {
+      random = new Random(seed);
+    }
+    words = new String[size];
+    //TODO change the default with the actual stats
+    //TODO do u need varied sized words?
+    for (int i = 0; i < size; ++i) {
+      words[i] = 
+        RandomStringUtils.random(wordSize, 0, 0, true, false, null, random);
+    }
+  }
+  
+  /**
+   * Returns a randomly selected word from a list of random words.
+   */
+  String getRandomWord() {
+    int index = random.nextInt(words.length);
+    return words[index];
+  }
+  
+  /**
+   * This is mainly for testing.
+   */
+  List<String> getRandomWords() {
+    return Arrays.asList(words);
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java Tue Mar  8 05:58:59 2011
@@ -94,6 +94,11 @@ public class SleepJob extends GridmixJob
   }
 
   @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
+  
+  @Override
   public Job call()
     throws IOException, InterruptedException, ClassNotFoundException {
     ugi.doAs(

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java?rev=1079236&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java Tue Mar  8 05:58:59 2011
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.CompressionEmulationUtil.RandomTextDataMapper;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link CompressionEmulationUtil}
+ */
+public class TestCompressionEmulationUtils {
+  //TODO Remove this once LocalJobRunner can run Gridmix.
+  static class CustomInputFormat extends GenerateData.GenDataFormat {
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      // get the total data to be generated
+      long toGen =
+        jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
+      if (toGen < 0) {
+        throw new IOException("Invalid/missing generation bytes: " + toGen);
+      }
+      // get the total number of mappers configured
+      int totalMappersConfigured =
+        jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1);
+      if (totalMappersConfigured < 0) {
+        throw new IOException("Invalid/missing num mappers: " 
+                              + totalMappersConfigured);
+      }
+      
+      final long bytesPerTracker = toGen / totalMappersConfigured;
+      final ArrayList<InputSplit> splits = 
+        new ArrayList<InputSplit>(totalMappersConfigured);
+      for (int i = 0; i < totalMappersConfigured; ++i) {
+        splits.add(new GenSplit(bytesPerTracker, 
+                   new String[] { "tracker_local" }));
+      }
+      return splits;
+    }
+  }
+  
+  /**
+   * Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
+   */
+  @Test
+  public void testRandomCompressedTextDataGenerator() throws Exception {
+    int wordSize = 10;
+    int listSize = 20;
+    long dataSize = 10*1024*1024;
+    
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    // configure the RandomTextDataGenerator to generate desired sized data
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, 
+                listSize);
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, 
+                wordSize);
+    conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
+    lfs.delete(tempDir, true);
+    
+    JobClient client = new JobClient(conf);
+    
+    // get the local job runner
+    conf.setInt(MRJobConfig.NUM_MAPS, 1);
+    
+    Job job = new Job(conf);
+    
+    CompressionEmulationUtil.configure(job);
+    job.setInputFormatClass(CustomInputFormat.class);
+    
+    // set the output path
+    FileOutputFormat.setOutputPath(job, tempDir);
+    
+    // submit and wait for completion
+    job.submit();
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+
+    assertEquals("Job Failed", 0, ret);
+    
+    // validate the output data
+    FileStatus[] files = 
+      lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    long size = 0;
+    long maxLineSize = 0;
+    
+    for (FileStatus status : files) {
+      InputStream in = 
+        CompressionEmulationUtil
+          .getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+      String line = reader.readLine();
+      if (line != null) {
+        long lineSize = line.getBytes().length;
+        if (lineSize > maxLineSize) {
+          maxLineSize = lineSize;
+        }
+        while (line != null) {
+          for (String word : line.split("\\s")) {
+            size += word.getBytes().length;
+          }
+          line = reader.readLine();
+        }
+      }
+      reader.close();
+    }
+
+    assertTrue(size >= dataSize);
+    assertTrue(size <= dataSize + maxLineSize);
+  }
+  
+  /**
+   * Test compressible {@link GridmixRecord}.
+   */
+  @Test
+  public void testCompressibleGridmixRecord() throws IOException {
+    JobConf conf = new JobConf();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    int dataSize = 1024 * 1024 * 10; // 10 MB
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, 
+                            "TestPossiblyCompressibleGridmixRecord");
+    lfs.delete(tempDir, true);
+    
+    // define a compressible GridmixRecord
+    GridmixRecord record = new GridmixRecord(dataSize, 0);
+    record.setCompressibility(true); // enable compression
+    
+    conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class, 
+                  CompressionCodec.class);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    
+    // write the record to a file
+    Path recordFile = new Path(tempDir, "record");
+    OutputStream outStream = CompressionEmulationUtil
+                               .getPossiblyCompressedOutputStream(recordFile, 
+                                                                  conf);    
+    DataOutputStream out = new DataOutputStream(outStream);
+    record.write(out);
+    out.close();
+    outStream.close();
+    
+    // open the compressed stream for reading
+    Path actualRecordFile = recordFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
+    
+    // get the compressed file size
+    long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
+    
+    GridmixRecord recordRead = new GridmixRecord();
+    recordRead.readFields(new DataInputStream(in));
+    
+    assertEquals("Record size mismatch in a compressible GridmixRecord",
+                 dataSize, recordRead.getSize());
+    assertTrue("Failed to generate a compressible GridmixRecord",
+               recordRead.getSize() > compressedFileSize);
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#isCompressionEmulationEnabled(
+   *          org.apache.hadoop.conf.Configuration)}.
+   */
+  @Test
+  public void testIsCompressionEmulationEnabled() {
+    Configuration conf = new Configuration();
+    // Check default values
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check disabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    assertFalse(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check enabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#getPossiblyDecompressedInputStream(Path, 
+   *                                   Configuration, long)}
+   *  and
+   *  {@link CompressionEmulationUtil#getPossiblyCompressedOutputStream(Path, 
+   *                                    Configuration)}.
+   */
+  @Test
+  public void testPossiblyCompressedDecompressedStreams() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class, 
+                  CompressionCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir =
+      new Path(rootTempDir, "TestPossiblyCompressedDecompressedStreams");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    // now read back the data from the compressed stream
+    compressedFile = compressedFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(compressedFile, conf, 0);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+    String readLine = reader.readLine();
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+    reader.close();
+  }
+  
+  /**
+   * Test if 
+   * {@link CompressionEmulationUtil#configureCompressionEmulation(
+   *        org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.JobConf)}
+   *  can extract compression related configuration parameters.
+   */
+  @Test
+  public void testExtractCompressionConfigs() {
+    JobConf source = new JobConf();
+    JobConf target = new JobConf();
+    
+    // set the default values
+    source.setBoolean(FileOutputFormat.COMPRESS, false);
+    source.set(FileOutputFormat.COMPRESS_CODEC, "MyDefaultCodec");
+    source.set(FileOutputFormat.COMPRESS_TYPE, "MyDefaultType");
+    source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); 
+    source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyDefaultCodec2");
+    
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check default values
+    assertFalse(target.getBoolean(FileOutputFormat.COMPRESS, true));
+    assertEquals("MyDefaultCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+    assertEquals("MyDefaultType", target.get(FileOutputFormat.COMPRESS_TYPE));
+    assertFalse(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true));
+    assertEquals("MyDefaultCodec2", 
+                 target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+    assertFalse(CompressionEmulationUtil
+                .isInputCompressionEmulationEnabled(target));
+    
+    // set new values
+    source.setBoolean(FileOutputFormat.COMPRESS, true);
+    source.set(FileOutputFormat.COMPRESS_CODEC, "MyCodec");
+    source.set(FileOutputFormat.COMPRESS_TYPE, "MyType");
+    source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); 
+    source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyCodec2");
+    org.apache.hadoop.mapred.FileInputFormat.setInputPaths(source, "file.gz");
+    
+    target = new JobConf(); // reset
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check new values
+    assertTrue(target.getBoolean(FileOutputFormat.COMPRESS, false));
+    assertEquals("MyCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+    assertEquals("MyType", target.get(FileOutputFormat.COMPRESS_TYPE));
+    assertTrue(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+    assertEquals("MyCodec2", 
+                 target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+    assertTrue(CompressionEmulationUtil
+               .isInputCompressionEmulationEnabled(target));
+  }
+  
+  /**
+   * Test of {@link FileQueue} can identify compressed file and provide
+   * readers to extract uncompressed data only if input-compression is enabled.
+   */
+  @Test
+  public void testFileQueueDecompression() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+    
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputCompressorClass(conf, 
+                                                GzipCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestFileQueueDecompression");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    compressedFile = compressedFile.suffix(".gz");
+    // now read back the data from the compressed stream using FileQueue
+    long fileSize = lfs.listStatus(compressedFile)[0].getLen();
+    CombineFileSplit split = 
+      new CombineFileSplit(new Path[] {compressedFile}, new long[] {fileSize});
+    FileQueue queue = new FileQueue(split, conf);
+    byte[] bytes = new byte[inputLine.getBytes().length];
+    queue.read(bytes);
+    queue.close();
+    String readLine = new String(bytes);
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java?rev=1079236&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java (added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java Tue Mar  8 05:58:59 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.gridmix.RandomTextDataGenerator;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link RandomTextDataGenerator}.
+ */
+public class TestRandomTextDataGenerator {
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate random words of 
+   * desired size.
+   */
+  @Test
+  public void testRandomTextDataGenerator() {
+    RandomTextDataGenerator rtdg = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words = rtdg.getRandomWords();
+
+    // check the size
+    assertEquals("List size mismatch", 10, words.size());
+
+    // check the words
+    Set<String> wordsSet = new HashSet<String>(words);
+    assertEquals("List size mismatch due to duplicates", 10, wordsSet.size());
+
+    // check the word lengths
+    for (String word : wordsSet) {
+      assertEquals("Word size mismatch", 5, word.length());
+    }
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate same words given the
+   * same list-size, word-length and seed.
+   */
+  @Test
+  public void testRandomTextDataGeneratorRepeatability() {
+    RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words1 = rtdg1.getRandomWords();
+
+    RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words2 = rtdg2.getRandomWords();
+    
+    assertTrue("List mismatch", words1.equals(words2));
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate different words given 
+   * different seeds.
+   */
+  @Test
+  public void testRandomTextDataGeneratorUniqueness() {
+    RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 1L, 5);
+    Set<String> words1 = new HashSet(rtdg1.getRandomWords());
+
+    RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+    Set<String> words2 = new HashSet(rtdg2.getRandomWords());
+    
+    assertFalse("List size mismatch across lists", words1.equals(words2));
+  }
+}

Modified: hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml?rev=1079236&r1=1079235&r2=1079236&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/gridmix.xml Tue Mar  8 05:58:59 2011
@@ -518,7 +518,7 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
     </section>
 
   <section id="distributedcacheload">
-  <title>Emulation of Distributed Cache Load</title>
+  <title>Emulating Distributed Cache Load</title>
     <p>Gridmix emulates Distributed Cache load by default for LOADJOB type of
     jobs. This is done by precreating the needed Distributed Cache files for all
     the simulated jobs as part of a separate MapReduce job.</p>
@@ -539,6 +539,75 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
     </ul>
   </section>
 
+  <section id="compression-emulation">
+      <title>Emulating Compression/Decompression</title>
+      <p>MapReduce supports data compression and decompression. 
+         Input to a MapReduce job can be compressed. Similarly, output of Map
+         and Reduce tasks can also be compressed. Compression/Decompression 
+         emulation in GridMix is important because emulating 
+         compression/decompression will effect the CPU and Memory usage of the 
+         task. A task emulating compression/decompression will affect other 
+         tasks and daemons running on the same node.
+       </p>
+       <p>Compression emulation is enabled if 
+         <code>gridmix.compression-emulation.enable</code> is set to
+         <code>true</code>. By default compression emulation is enabled for 
+         jobs of type <em>LOADJOB</em>. With compression emulation enabled, 
+         GridMix will now generate compressed text data with a constant 
+         compression ratio. Hence a simulated GridMix job will now emulate 
+         compression/decompression using compressible text data (having a 
+         constant compression ratio), irrespective of the compression ratio 
+         observed in the actual job.
+      </p>
+      <p>A typical MapReduce Job deals with data compression/decompression in 
+         the following phases </p>
+      <ul>
+        <li><code>Job input data decompression: </code> GridMix generates 
+            compressible input data when compression emulation is enabled. 
+            Based on the original job's configuration, a simulated GridMix job 
+            will use a decompressor to read the compressed input data. 
+            Currently, GridMix uses
+            <code>mapreduce.input.fileinputformat.inputdir</code> to determine 
+            if the original job used compressed input data or 
+            not. If the original job input files are uncompressed then the 
+            simulated job will read the compressed input file with using a 
+            decompressor. 
+        </li>
+        <li><code>Intermediate data compression and decompression: </code>
+            If the original job has map output compression enabled then GridMix 
+            too will enable map output compression for the simulated job. 
+            Accordingly, the reducers will use a decompressor to read the map 
+            output data.
+        </li>
+        <li><code>Job output data compression: </code>
+            If the original job's output is compressed then GridMix 
+            too will enable job output compression for the simulated job. 
+        </li>
+      </ul>
+       
+      <p>The following configuration parameters affect compression emulation
+      </p>
+      <table>
+        <tr>
+          <th>Parameter</th>
+          <th>Description</th>
+        </tr>
+        <tr>
+          <td>gridmix.compression-emulation.enable</td>
+          <td>Enables compression emulation in simulated GridMix jobs. 
+              Default is true.</td>
+        </tr>
+      </table>
+      
+      <p>With compression emulation turned on, GridMix will generate compressed
+         input data. Hence the total size of the input 
+         data will be lesser than the expected size. Set 
+         <code>gridmix.min.file.size</code> to a smaller value (roughly 10% of
+         <code>gridmix.gen.bytes.per.file</code>) for enabling GridMix to 
+         correctly emulate compression.
+      </p>
+    </section>
+
     <section id="assumptions">
       <title>Simplifying Assumptions</title>
       <p>GridMix will be developed in stages, incorporating feedback and
@@ -556,9 +625,8 @@ hadoop jar &lt;gridmix-jar&gt; org.apach
 	sizes, namespace hierarchies, or any property of input, intermediate
 	or output data other than the bytes/records consumed and emitted from
 	a given task. This implies that some of the most heavily-used parts of
-	the system - the compression libraries, text processing, streaming,
-	etc. - cannot be meaningfully tested with the current
-	implementation.</li>
+	the system - text processing, streaming, etc. - cannot be meaningfully tested 
+	with the current implementation.</li>
 	<li><em>I/O Rates</em> - The rate at which records are
 	consumed/emitted is assumed to be limited only by the speed of the
 	reader/writer and constant throughout the task.</li>



Mime
View raw message