hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r903227 [4/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/grid...
Date Tue, 26 Jan 2010 14:03:09 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Tue Jan 26 14:02:53 2010
@@ -223,7 +223,6 @@
         return getSize();
       }
 
-      // TODO sort, pick rand pairs of kth large/small in dir
       IndexMapper mapping;
       if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
         mapping = new DenseIndexMapper(curdir.size());
@@ -234,13 +233,13 @@
       ArrayList<Integer> selected = new ArrayList<Integer>();
       long ret = 0L;
       int poolSize = curdir.size();
-      while (ret < targetSize) {
+      do {
         int pos = rand.nextInt(poolSize);
         int index = mapping.get(pos);
         selected.add(index);
         ret += curdir.get(index).getLen();
         mapping.swap(pos, --poolSize);
-      }
+      } while (ret < targetSize);
 
       for (Integer i : selected) {
         files.add(curdir.get(i));

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Tue Jan 26 14:02:53 2010
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,7 +29,6 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -52,10 +52,30 @@
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
 
+  /**
+   * Total bytes to write.
+   */
+  public static final String GRIDMIX_GEN_BYTES = "gridmix.gen.bytes";
+
+  /**
+   * Maximum size per file written.
+   */
+  public static final String GRIDMIX_GEN_CHUNK = "gridmix.gen.bytes.per.file";
+
+  /**
+   * Size of writes to output file.
+   */
+  public static final String GRIDMIX_VAL_BYTES = "gendata.val.bytes";
+
+  /**
+   * Status reporting interval, in megabytes.
+   */
+  public static final String GRIDMIX_GEN_INTERVAL = "gendata.interval.mb";
+
   public GenerateData(Configuration conf, Path outdir, long genbytes)
       throws IOException {
     super(conf, 0L, "GRIDMIX_GENDATA");
-    job.getConfiguration().setLong("gridmix.gendata.bytes", genbytes);
+    job.getConfiguration().setLong(GRIDMIX_GEN_BYTES, genbytes);
     FileOutputFormat.setOutputPath(job, outdir);
   }
 
@@ -84,7 +104,7 @@
     protected void setup(Context context)
         throws IOException, InterruptedException {
       val = new BytesWritable(new byte[
-          context.getConfiguration().getInt("gendata.val.bytes", 1024 * 1024)]);
+          context.getConfiguration().getInt(GRIDMIX_VAL_BYTES, 1024 * 1024)]);
     }
 
     @Override
@@ -106,7 +126,7 @@
       final JobClient client = new JobClient(jobCtxt.getConfiguration());
       ClusterStatus stat = client.getClusterStatus(true);
       final long toGen =
-        jobCtxt.getConfiguration().getLong("gridmix.gendata.bytes", -1);
+        jobCtxt.getConfiguration().getLong(GRIDMIX_GEN_BYTES, -1);
       if (toGen < 0) {
         throw new IOException("Invalid/missing generation bytes: " + toGen);
       }
@@ -144,7 +164,7 @@
             throws IOException, InterruptedException {
           toWrite = split.getLength();
           RINTERVAL = ctxt.getConfiguration().getInt(
-              "gendata.report.interval.mb", 10) << 20;
+              GRIDMIX_GEN_INTERVAL, 10) << 20;
         }
         @Override
         public boolean nextKeyValue() throws IOException {
@@ -219,20 +239,52 @@
     public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
         TaskAttemptContext job) throws IOException {
 
-      Path file = getDefaultWorkFile(job, "");
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
-      final FSDataOutputStream fileOut = fs.create(file, false);
-      return new RecordWriter<NullWritable,BytesWritable>() {
-        @Override
-        public void write(NullWritable key, BytesWritable value)
-            throws IOException {
-          fileOut.write(value.getBytes(), 0, value.getLength());
-        }
-        @Override
-        public void close(TaskAttemptContext ctxt) throws IOException {
+      return new ChunkWriter(getDefaultWorkFile(job, ""),
+          job.getConfiguration());
+    }
+
+    static class ChunkWriter extends RecordWriter<NullWritable,BytesWritable> {
+      private final Path outDir;
+      private final FileSystem fs;
+      private final long maxFileBytes;
+
+      private long accFileBytes = 0L;
+      private long fileIdx = -1L;
+      private OutputStream fileOut = null;
+
+      public ChunkWriter(Path outDir, Configuration conf) throws IOException {
+        this.outDir = outDir;
+        fs = outDir.getFileSystem(conf);
+        maxFileBytes = conf.getLong(GRIDMIX_GEN_CHUNK, 1L << 30);
+        nextDestination();
+      }
+      private void nextDestination() throws IOException {
+        if (fileOut != null) {
           fileOut.close();
         }
-      };
+        fileOut = fs.create(new Path(outDir, "segment-" + (++fileIdx)), false);
+        accFileBytes = 0L;
+      }
+      @Override
+      public void write(NullWritable key, BytesWritable value)
+          throws IOException {
+        int written = 0;
+        final int total = value.getLength();
+        while (written < total) {
+          final int write = (int)
+            Math.min(total - written, maxFileBytes - accFileBytes);
+          fileOut.write(value.getBytes(), written, write);
+          written += write;
+          accFileBytes += write;
+          if (accFileBytes >= maxFileBytes) {
+            nextDestination();
+          }
+        }
+      }
+      @Override
+      public void close(TaskAttemptContext ctxt) throws IOException {
+        fileOut.close();
+      }
     }
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Tue Jan 26 14:02:53 2010
@@ -26,7 +26,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Job;
@@ -73,12 +72,11 @@
     "gridmix.client.pending.queue.depth";
 
   /**
-   * Size of key data in synthetic jobs. At present, key length is not
-   * available in job traces. Since all solutions are equally bad, globally
-   * specifying the amount of each record that is key data is the simplest
-   * to implement and the method chosen.
+   * Multiplier to accelerate or decelerate job submission. As a crude means of
+   * sizing a job trace to a cluster, the time separating two jobs is
+   * multiplied by this factor.
    */
-  public static final String GRIDMIX_KEY_LEN = "gridmix.min.key.length";
+  public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
 
   // Submit data structures
   private JobFactory factory;
@@ -135,7 +133,7 @@
     submitter = createJobSubmitter(monitor,
         conf.getInt(GRIDMIX_SUB_THR,
           Runtime.getRuntime().availableProcessors() + 1),
-        conf.getInt(GRIDMIX_QUE_DEP, 100),
+        conf.getInt(GRIDMIX_QUE_DEP, 5),
         new FilePool(conf, ioPath));
     factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
     monitor.start();
@@ -182,12 +180,10 @@
       printUsage(System.err);
       return 1;
     }
-    FileSystem fs = null;
     InputStream trace = null;
     try {
       final Configuration conf = getConf();
       Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
-      fs = scratchDir.getFileSystem(conf);
       // add shutdown hook for SIGINT, etc.
       Runtime.getRuntime().addShutdownHook(sdh);
       CountDownLatch startFlag = new CountDownLatch(1);
@@ -210,7 +206,7 @@
 
       if (factory != null) {
         // wait for input exhaustion
-        factory.join();
+        factory.join(Long.MAX_VALUE);
         final Throwable badTraceException = factory.error();
         if (null != badTraceException) {
           LOG.error("Error in trace", badTraceException);
@@ -218,10 +214,10 @@
         }
         // wait for pending tasks to be submitted
         submitter.shutdown();
-        submitter.join();
+        submitter.join(Long.MAX_VALUE);
         // wait for running tasks to complete
         monitor.shutdown();
-        monitor.join();
+        monitor.join(Long.MAX_VALUE);
       }
     } finally {
       IOUtils.cleanup(LOG, trace);
@@ -236,13 +232,17 @@
    */
   class Shutdown extends Thread {
 
-    private void killComponent(Component<?> component) {
+    static final long FAC_SLEEP = 1000;
+    static final long SUB_SLEEP = 4000;
+    static final long MON_SLEEP = 15000;
+
+    private void killComponent(Component<?> component, long maxwait) {
       if (component == null) {
         return;
       }
-      component.abort();   // read no more tasks
+      component.abort();
       try {
-        component.join();
+        component.join(maxwait);
       } catch (InterruptedException e) {
         LOG.warn("Interrupted waiting for " + component);
       }
@@ -253,9 +253,9 @@
     public void run() {
       LOG.info("Exiting...");
       try {
-        killComponent(factory);   // read no more tasks
-        killComponent(submitter); // submit no more tasks
-        killComponent(monitor);   // process remaining jobs in this thread
+        killComponent(factory, FAC_SLEEP);   // read no more tasks
+        killComponent(submitter, SUB_SLEEP); // submit no more tasks
+        killComponent(monitor, MON_SLEEP);   // process remaining jobs here
       } finally {
         if (monitor == null) {
           return;
@@ -306,7 +306,8 @@
     out.printf("       %-40s : Output directory\n", GRIDMIX_OUT_DIR);
     out.printf("       %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
     out.printf("       %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
-    out.printf("       %-40s : Key size\n", GRIDMIX_KEY_LEN);
+    out.printf("       %-40s : Key fraction of rec\n",
+        AvgRecordFactory.GRIDMIX_KEY_FRC);
   }
 
   /**
@@ -331,7 +332,7 @@
      * Wait until the service completes. It is assumed that either a
      * {@link #shutdown} or {@link #abort} has been requested.
      */
-    void join() throws InterruptedException;
+    void join(long millis) throws InterruptedException;
 
     /**
      * Shut down gracefully, finishing all pending work. Reject new requests.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Tue Jan 26 14:02:53 2010
@@ -17,22 +17,10 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.nio.LongBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
 import java.util.Formatter;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -40,15 +28,10 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
@@ -65,7 +48,6 @@
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.tools.rumen.JobStory;
@@ -80,6 +62,7 @@
 class GridmixJob implements Callable<Job>, Delayed {
 
   public static final String JOBNAME = "GRIDMIX";
+  public static final String ORIGNAME = "gridmix.job.name.original";
   public static final Log LOG = LogFactory.getLog(GridmixJob.class);
 
   private static final ThreadLocal<Formatter> nameFormat =
@@ -180,14 +163,17 @@
     job.setReducerClass(GridmixReducer.class);
     job.setNumReduceTasks(jobdesc.getNumberReduces());
     job.setMapOutputKeyClass(GridmixKey.class);
-    job.setMapOutputValueClass(BytesWritable.class);
-    job.setSortComparatorClass(BytesWritable.Comparator.class);
+    job.setMapOutputValueClass(GridmixRecord.class);
+    job.setSortComparatorClass(GridmixKey.Comparator.class);
     job.setGroupingComparatorClass(SpecGroupingComparator.class);
     job.setInputFormatClass(GridmixInputFormat.class);
     job.setOutputFormatClass(RawBytesOutputFormat.class);
     job.setPartitionerClass(DraftPartitioner.class);
     job.setJarByClass(GridmixJob.class);
     job.getConfiguration().setInt("gridmix.job.seq", seq);
+    job.getConfiguration().set(ORIGNAME, null == jobdesc.getJobID()
+        ? "<unknown>" : jobdesc.getJobID().toString());
+    job.getConfiguration().setBoolean(Job.USED_GENERIC_PARSER, true);
     FileInputFormat.addInputPath(job, new Path("ignored"));
     FileOutputFormat.setOutputPath(job, outdir);
     job.submit();
@@ -200,11 +186,10 @@
     }
   }
 
-  /**
-   * Group REDUCE_SPEC records together
-   */
   public static class SpecGroupingComparator
-      implements RawComparator<GridmixKey>, Serializable {
+      implements RawComparator<GridmixKey> {
+    private final DataInputBuffer di = new DataInputBuffer();
+    private final byte[] reset = di.getData();
     @Override
     public int compare(GridmixKey g1, GridmixKey g2) {
       final byte t1 = g1.getType();
@@ -215,284 +200,128 @@
       }
       assert t1 == GridmixKey.DATA;
       assert t2 == GridmixKey.DATA;
-      return WritableComparator.compareBytes(
-          g1.getBytes(), 0, g1.getLength(),
-          g2.getBytes(), 0, g2.getLength());
+      return g1.compareTo(g2);
     }
     @Override
-    public int compare(byte[] b1, int s1, int l1,
-                       byte[] b2, int s2, int l2) {
-      final byte t1 = b1[s1 + 4];
-      final byte t2 = b2[s2 + 4];
-      if (t1 == GridmixKey.REDUCE_SPEC ||
-          t2 == GridmixKey.REDUCE_SPEC) {
-        return t1 - t2;
-      }
-      assert t1 == GridmixKey.DATA;
-      assert t2 == GridmixKey.DATA;
-      return WritableComparator.compareBytes(
-          b1, s1 + 4, l1 - 4,
-          b2, s2 + 4, l2 - 4);
-    }
-  }
-
-  /**
-   * Keytype for synthetic jobs, some embedding instructions for the reduce.
-   */
-  public static class GridmixKey extends BytesWritable {
-    // long fields specifying reduce contract
-    private enum RSpec { REC_IN, REC_OUT, BYTES_OUT };
-    private static final int SPEC_START = 5; // type + partition len
-    private static final int NUMFIELDS = RSpec.values().length;
-    private static final int SPEC_SIZE = NUMFIELDS * 8;
-
-    // Key types
-    static final byte REDUCE_SPEC = 0;
-    static final byte DATA = 1;
-
-    private IntBuffer partition;
-    private LongBuffer spec;
-
-    public GridmixKey() {
-      super(new byte[SPEC_START]);
-    }
-
-    public GridmixKey(byte type, byte[] b) {
-      super(b);
-      setType(type);
-    }
-
-    public byte getType() {
-      return getBytes()[0];
-    }
-    public void setPartition(int partition) {
-      this.partition.put(0, partition);
-    }
-    public int getPartition() {
-      return partition.get(0);
-    }
-    public long getReduceInputRecords() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.REC_IN.ordinal());
-    }
-    public long getReduceOutputBytes() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.BYTES_OUT.ordinal());
-    }
-    public long getReduceOutputRecords() {
-      checkState(REDUCE_SPEC);
-      return spec.get(RSpec.REC_OUT.ordinal());
-    }
-    public void setType(byte b) {
-      switch (b) {
-        case REDUCE_SPEC:
-          if (getCapacity() < SPEC_START + SPEC_SIZE) {
-            setSize(SPEC_START + SPEC_SIZE);
-          }
-          spec =
-            ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer();
-          break;
-        case DATA:
-          if (getCapacity() < SPEC_START) {
-            setSize(SPEC_START);
-          }
-          spec = null;
-          break;
-        default:
-          throw new IllegalArgumentException("Illegal type " + b);
-      }
-      getBytes()[0] = b;
-      partition =
-        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
-    }
-    public void setReduceInputRecords(long records) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.REC_IN.ordinal(), records);
-    }
-    public void setReduceOutputBytes(long bytes) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.BYTES_OUT.ordinal(), bytes);
-    }
-    public void setReduceOutputRecords(long records) {
-      checkState(REDUCE_SPEC);
-      spec.put(RSpec.REC_OUT.ordinal(), records);
-    }
-    private void checkState(byte b) {
-      if (getLength() < SPEC_START || getType() != b) {
-        throw new IllegalStateException("Expected " + b + ", was " + getType());
-      }
-    }
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      if (getLength() < SPEC_START) {
-        throw new IOException("Invalid GridmixKey, len " + getLength());
-      }
-      partition =
-        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
-      spec = getType() == REDUCE_SPEC
-        ? ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer()
-        : null;
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      if (getType() == REDUCE_SPEC) {
-        LOG.debug("SPEC(" + getPartition() + ") " + getReduceInputRecords() +
-            " -> " + getReduceOutputRecords() + "/" + getReduceOutputBytes());
-      }
-    }
-    @Override
-    public boolean equals(Object other) {
-      if (other instanceof GridmixKey) {
-        return super.equals(other);
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try {
+        final int ret;
+        di.reset(b1, s1, l1);
+        final int x1 = WritableUtils.readVInt(di);
+        di.reset(b2, s2, l2);
+        final int x2 = WritableUtils.readVInt(di);
+        final int t1 = b1[s1 + x1];
+        final int t2 = b2[s2 + x2];
+        if (t1 == GridmixKey.REDUCE_SPEC ||
+            t2 == GridmixKey.REDUCE_SPEC) {
+          ret = t1 - t2;
+        } else {
+          assert t1 == GridmixKey.DATA;
+          assert t2 == GridmixKey.DATA;
+          ret =
+            WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
+        }
+        di.reset(reset, 0, 0);
+        return ret;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return super.hashCode();
     }
   }
 
   public static class GridmixMapper
-      extends Mapper<IntWritable,BytesWritable,GridmixKey,BytesWritable> {
-
-    private final Random r = new Random();
-    private GridmixKey key;
-    private final BytesWritable val = new BytesWritable();
+      extends Mapper<NullWritable,GridmixRecord,GridmixKey,GridmixRecord> {
 
-    private int keyLen;
     private double acc;
     private double ratio;
-    private int[] reduceRecordSize;
-    private long[] reduceRecordCount;
-    private long[] reduceRecordRemaining;
+    private final ArrayList<RecordFactory> reduces =
+      new ArrayList<RecordFactory>();
+    private final Random r = new Random();
+
+    private final GridmixKey key = new GridmixKey();
+    private final GridmixRecord val = new GridmixRecord();
 
     @Override
-    protected void setup(Context context)
+    protected void setup(Context ctxt)
         throws IOException, InterruptedException {
-      // TODO clearly job-specific, but no data at present
-      keyLen = context.getConfiguration().getInt(Gridmix.GRIDMIX_KEY_LEN, 20);
-      key = new GridmixKey(GridmixKey.DATA, new byte[keyLen]);
-      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
-      LOG.info("ID: " + split.getId());
-      reduceRecordCount = split.getOutputRecords();
-      reduceRecordRemaining =
-        Arrays.copyOf(reduceRecordCount, reduceRecordCount.length);
-      reduceRecordSize = new int[reduceRecordCount.length];
-      int valsize = -1;
+      final Configuration conf = ctxt.getConfiguration();
+      final GridmixSplit split = (GridmixSplit) ctxt.getInputSplit();
+      final int maps = split.getMapCount();
       final long[] reduceBytes = split.getOutputBytes();
+      final long[] reduceRecords = split.getOutputRecords();
+
       long totalRecords = 0L;
-      for (int i = 0; i < reduceBytes.length; ++i) {
-        reduceRecordSize[i] = Math.max(0,
-          Math.round(reduceBytes[i] / (1.0f * reduceRecordCount[i])) - keyLen);
-        valsize = Math.max(reduceRecordSize[i], valsize);
-        totalRecords += reduceRecordCount[i];
-      }
-      valsize = Math.max(0, valsize - 4); // BW len encoding
-      val.setCapacity(valsize);
-      val.setSize(valsize);
-      ratio = totalRecords / (1.0 * split.getInputRecords());
+      final int nReduces = ctxt.getNumReduceTasks();
+      if (nReduces > 0) {
+        int idx = 0;
+        int id = split.getId();
+        for (int i = 0; i < nReduces; ++i) {
+          final GridmixKey.Spec spec = new GridmixKey.Spec();
+          if (i == id) {
+            spec.bytes_out = split.getReduceBytes(idx);
+            spec.rec_out = split.getReduceRecords(idx);
+            ++idx;
+            id += maps;
+          }
+          reduces.add(new IntermediateRecordFactory(
+              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              i, reduceRecords[i], spec, conf));
+          totalRecords += reduceRecords[i];
+        }
+      } else {
+        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
+              conf));
+        totalRecords = reduceRecords[0];
+      }
+      final long splitRecords = split.getInputRecords();
+      final long inputRecords = splitRecords <= 0 && split.getLength() >= 0
+        ? Math.max(1,
+          split.getLength() / conf.getInt("gridmix.missing.rec.size", 64*1024))
+        : splitRecords;
+      ratio = totalRecords / (1.0 * inputRecords);
       acc = 0.0;
     }
 
-    protected void fillBytes(BytesWritable val, int len) {
-      r.nextBytes(val.getBytes());
-      val.setSize(len);
-    }
-
-    /** Find next non-empty partition after start. */
-    private int getNextPart(final int start) {
-      int p = start;
-      do {
-        p = (p + 1) % reduceRecordSize.length;
-      } while (0 == reduceRecordRemaining[p] && p != start);
-      return 0 == reduceRecordRemaining[p] ? -1 : p;
-    }
-
     @Override
-    public void map(IntWritable ignored, BytesWritable bytes,
+    public void map(NullWritable ignored, GridmixRecord rec,
         Context context) throws IOException, InterruptedException {
-      int p = getNextPart(r.nextInt(reduceRecordSize.length));
-      if (-1 == p) {
-        return;
-      }
       acc += ratio;
-      while (acc >= 1.0) {
-        fillBytes(key, key.getLength());
-        key.setType(GridmixKey.DATA);
-        key.setPartition(p);
-        --reduceRecordRemaining[p];
-        fillBytes(val, reduceRecordSize[p]);
+      while (acc >= 1.0 && !reduces.isEmpty()) {
+        key.setSeed(r.nextLong());
+        val.setSeed(r.nextLong());
+        final int idx = r.nextInt(reduces.size());
+        final RecordFactory f = reduces.get(idx);
+        if (!f.next(key, val)) {
+          reduces.remove(idx);
+          continue;
+        }
         context.write(key, val);
         acc -= 1.0;
-        if (0 == reduceRecordRemaining[p] && -1 == (p = getNextPart(p))) {
-          return;
-        }
       }
     }
 
     @Override
     public void cleanup(Context context)
         throws IOException, InterruptedException {
-      // output any remaining records
-      // TODO include reduce spec in remaining records if avail
-      //      (i.e. move this to map)
-      for (int i = 0; i < reduceRecordSize.length; ++i) {
-        for (long j = reduceRecordRemaining[i]; j > 0; --j) {
-          fillBytes(key, key.getLength());
-          key.setType(GridmixKey.DATA);
-          key.setPartition(i);
-          fillBytes(val, reduceRecordSize[i]);
+      for (RecordFactory factory : reduces) {
+        key.setSeed(r.nextLong());
+        while (factory.next(key, val)) {
           context.write(key, val);
+          key.setSeed(r.nextLong());
         }
       }
-      val.setSize(0);
-      key.setType(GridmixKey.REDUCE_SPEC);
-      final int reduces = context.getNumReduceTasks();
-      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
-      final int maps = split.getMapCount();
-      int idx = 0;
-      int id = split.getId();
-      for (int i = 0; i < reduces; ++i) {
-        key.setPartition(i);
-        key.setReduceInputRecords(reduceRecordCount[i]);
-        // Write spec for all red st r_id % id == 0
-        if (i == id) {
-          key.setReduceOutputBytes(split.getReduceBytes(idx));
-          key.setReduceOutputRecords(split.getReduceRecords(idx));
-          LOG.debug(String.format("SPEC'D %d / %d to %d",
-                split.getReduceRecords(idx), split.getReduceBytes(idx), i));
-          ++idx;
-          id += maps;
-        } else {
-          key.setReduceOutputBytes(0);
-          key.setReduceOutputRecords(0);
-        }
-        context.write(key, val);
-      }
     }
   }
 
   public static class GridmixReducer
-      extends Reducer<GridmixKey,BytesWritable,NullWritable,BytesWritable> {
+      extends Reducer<GridmixKey,GridmixRecord,NullWritable,GridmixRecord> {
 
     private final Random r = new Random();
-    private final BytesWritable val = new BytesWritable();
+    private final GridmixRecord val = new GridmixRecord();
 
     private double acc;
     private double ratio;
-    private long written;
-    private long inRecords = 0L;
-    private long outBytes = 0L;
-    private long outRecords = 0L;
-
-    protected void fillBytes(BytesWritable val, int len) {
-      r.nextBytes(val.getBytes());
-      val.setSize(len);
-    }
+    private RecordFactory factory;
 
     @Override
     protected void setup(Context context)
@@ -501,62 +330,52 @@
            context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
         throw new IOException("Missing reduce spec");
       }
-      for (BytesWritable ignored : context.getValues()) {
+      long outBytes = 0L;
+      long outRecords = 0L;
+      long inRecords = 0L;
+      for (GridmixRecord ignored : context.getValues()) {
         final GridmixKey spec = context.getCurrentKey();
         inRecords += spec.getReduceInputRecords();
-        LOG.debug("GOT COUNT " + spec.getReduceInputRecords());
         outBytes += spec.getReduceOutputBytes();
         outRecords += spec.getReduceOutputRecords();
       }
-      LOG.debug("GOT SPEC " + outRecords + "/" + outBytes);
-      val.setCapacity(Math.round(outBytes / (1.0f * outRecords)));
+      if (0 == outRecords && inRecords > 0) {
+        LOG.info("Spec output bytes w/o records. Using input record count");
+        outRecords = inRecords;
+      }
+      factory =
+        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
-      LOG.debug(String.format("RECV %d -> %10d/%10d %d %f", inRecords,
-            outRecords, outBytes, val.getCapacity(), ratio));
     }
     @Override
-    protected void reduce(GridmixKey key, Iterable<BytesWritable> values,
+    protected void reduce(GridmixKey key, Iterable<GridmixRecord> values,
         Context context) throws IOException, InterruptedException {
-      for (BytesWritable ignored : values) {
+      for (GridmixRecord ignored : values) {
         acc += ratio;
-        while (acc >= 1.0 && written < outBytes) {
-          final int len = (int) Math.min(outBytes - written, val.getCapacity());
-          fillBytes(val, len);
+        while (acc >= 1.0 && factory.next(null, val)) {
           context.write(NullWritable.get(), val);
           acc -= 1.0;
-          written += len;
-          LOG.debug(String.format("%f %d/%d", acc, written, outBytes));
         }
       }
     }
     @Override
     protected void cleanup(Context context)
         throws IOException, InterruptedException {
-      while (written < outBytes) {
-        final int len = (int) Math.min(outBytes - written, val.getCapacity());
-        fillBytes(val, len);
+      val.setSeed(r.nextLong());
+      while (factory.next(null, val)) {
         context.write(NullWritable.get(), val);
-        written += len;
+        val.setSeed(r.nextLong());
       }
     }
   }
 
   static class GridmixRecordReader
-      extends RecordReader<IntWritable,BytesWritable> {
+      extends RecordReader<NullWritable,GridmixRecord> {
 
-    private long bytesRead = 0;
-    private long bytesTotal;
-    private Configuration conf;
-    private final IntWritable key = new IntWritable();
-    private final BytesWritable inBytes = new BytesWritable();
-
-    private FSDataInputStream input;
-    private int idx = -1;
-    private int capacity;
-    private Path[] paths;
-    private long[] startoffset;
-    private long[] lengths;
+    private RecordFactory factory;
+    private final Random r = new Random();
+    private final GridmixRecord val = new GridmixRecord();
 
     public GridmixRecordReader() { }
 
@@ -564,178 +383,36 @@
     public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
             throws IOException, InterruptedException {
       final GridmixSplit split = (GridmixSplit)genericSplit;
-      this.conf = ctxt.getConfiguration();
-      paths = split.getPaths();
-      startoffset = split.getStartOffsets();
-      lengths = split.getLengths();
-      bytesTotal = split.getLength();
-      capacity = (int) Math.round(bytesTotal / (1.0 * split.getInputRecords()));
-      inBytes.setCapacity(capacity);
-      nextSource();
-    }
-    private void nextSource() throws IOException {
-      idx = (idx + 1) % paths.length;
-      final Path file = paths[idx];
-      final FileSystem fs = file.getFileSystem(conf);
-      input = fs.open(file, capacity);
-      input.seek(startoffset[idx]);
+      final Configuration conf = ctxt.getConfiguration();
+      factory = new ReadRecordFactory(split.getLength(),
+          split.getInputRecords(), new FileQueue(split, conf), conf);
     }
+
     @Override
     public boolean nextKeyValue() throws IOException {
-      if (bytesRead >= bytesTotal) {
-        return false;
-      }
-      final int len = (int)
-        Math.min(bytesTotal - bytesRead, inBytes.getCapacity());
-      int kvread = 0;
-      while (kvread < len) {
-        assert lengths[idx] >= 0;
-        if (lengths[idx] <= 0) {
-          nextSource();
-          continue;
-        }
-        final int srcRead = (int) Math.min(len - kvread, lengths[idx]);
-        IOUtils.readFully(input, inBytes.getBytes(), kvread, srcRead);
-        //LOG.trace("Read " + srcRead + " bytes from " + paths[idx]);
-        lengths[idx] -= srcRead;
-        kvread += srcRead;
-      }
-      bytesRead += kvread;
-      return true;
+      val.setSeed(r.nextLong());
+      return factory.next(null, val);
     }
     @Override
     public float getProgress() throws IOException {
-      return bytesRead / ((float)bytesTotal);
+      return factory.getProgress();
     }
     @Override
-    public IntWritable getCurrentKey() { return key; }
+    public NullWritable getCurrentKey() {
+      return NullWritable.get();
+    }
     @Override
-    public BytesWritable getCurrentValue() { return inBytes; }
+    public GridmixRecord getCurrentValue() {
+      return val;
+    }
     @Override
     public void close() throws IOException {
-      IOUtils.cleanup(null, input);
-    }
-  }
-
-  static class GridmixSplit extends CombineFileSplit {
-    private int id;
-    private int nSpec;
-    private int maps;
-    private int reduces;
-    private long inputRecords;
-    private long outputBytes;
-    private long outputRecords;
-    private long maxMemory;
-    private double[] reduceBytes = new double[0];
-    private double[] reduceRecords = new double[0];
-
-    // Spec for reduces id mod this
-    private long[] reduceOutputBytes = new long[0];
-    private long[] reduceOutputRecords = new long[0];
-
-    GridmixSplit() {
-      super();
-    }
-
-    public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
-        long inputBytes, long inputRecords, long outputBytes,
-        long outputRecords, double[] reduceBytes, double[] reduceRecords,
-        long[] reduceOutputBytes, long[] reduceOutputRecords)
-        throws IOException {
-      super(cfsplit);
-      this.id = id;
-      this.maps = maps;
-      reduces = reduceBytes.length;
-      this.inputRecords = inputRecords;
-      this.outputBytes = outputBytes;
-      this.outputRecords = outputRecords;
-      this.reduceBytes = Arrays.copyOf(reduceBytes, reduces);
-      this.reduceRecords = Arrays.copyOf(reduceRecords, reduces);
-      nSpec = reduceOutputBytes.length;
-      this.reduceOutputBytes = reduceOutputBytes;
-      this.reduceOutputRecords = reduceOutputRecords;
-    }
-    public int getId() {
-      return id;
-    }
-    public int getMapCount() {
-      return maps;
-    }
-    public long getInputRecords() {
-      return inputRecords;
-    }
-    public long[] getOutputBytes() {
-      final long[] ret = new long[reduces];
-      for (int i = 0; i < reduces; ++i) {
-        ret[i] = Math.round(outputBytes * reduceBytes[i]);
-      }
-      return ret;
-    }
-    public long[] getOutputRecords() {
-      final long[] ret = new long[reduces];
-      for (int i = 0; i < reduces; ++i) {
-        ret[i] = Math.round(outputRecords * reduceRecords[i]);
-      }
-      return ret;
-    }
-    public long getReduceBytes(int i) {
-      return reduceOutputBytes[i];
-    }
-    public long getReduceRecords(int i) {
-      return reduceOutputRecords[i];
-    }
-    @Override
-    public void write(DataOutput out) throws IOException {
-      super.write(out);
-      WritableUtils.writeVInt(out, id);
-      WritableUtils.writeVInt(out, maps);
-      WritableUtils.writeVLong(out, inputRecords);
-      WritableUtils.writeVLong(out, outputBytes);
-      WritableUtils.writeVLong(out, outputRecords);
-      WritableUtils.writeVLong(out, maxMemory);
-      WritableUtils.writeVInt(out, reduces);
-      for (int i = 0; i < reduces; ++i) {
-        out.writeDouble(reduceBytes[i]);
-        out.writeDouble(reduceRecords[i]);
-      }
-      WritableUtils.writeVInt(out, nSpec);
-      for (int i = 0; i < nSpec; ++i) {
-        out.writeLong(reduceOutputBytes[i]);
-        out.writeLong(reduceOutputRecords[i]);
-      }
-    }
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      super.readFields(in);
-      id = WritableUtils.readVInt(in);
-      maps = WritableUtils.readVInt(in);
-      inputRecords = WritableUtils.readVLong(in);
-      outputBytes = WritableUtils.readVLong(in);
-      outputRecords = WritableUtils.readVLong(in);
-      maxMemory = WritableUtils.readVLong(in);
-      reduces = WritableUtils.readVInt(in);
-      if (reduceBytes.length < reduces) {
-        reduceBytes = new double[reduces];
-        reduceRecords = new double[reduces];
-      }
-      for (int i = 0; i < reduces; ++i) {
-        reduceBytes[i] = in.readDouble();
-        reduceRecords[i] = in.readDouble();
-      }
-      nSpec = WritableUtils.readVInt(in);
-      if (reduceOutputBytes.length < nSpec) {
-        reduceOutputBytes = new long[nSpec];
-        reduceOutputRecords = new long[nSpec];
-      }
-      for (int i = 0; i < nSpec; ++i) {
-        reduceOutputBytes[i] = in.readLong();
-        reduceOutputRecords[i] = in.readLong();
-      }
+      factory.close();
     }
   }
 
   static class GridmixInputFormat
-      extends InputFormat<IntWritable,BytesWritable> {
+      extends InputFormat<NullWritable,GridmixRecord> {
 
     @Override
     public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
@@ -743,29 +420,28 @@
             "gridmix.job.seq", -1));
     }
     @Override
-    public RecordReader<IntWritable,BytesWritable> createRecordReader(
+    public RecordReader<NullWritable,GridmixRecord> createRecordReader(
         InputSplit split, final TaskAttemptContext taskContext)
         throws IOException {
       return new GridmixRecordReader();
     }
   }
 
-  static class RawBytesOutputFormat
-      extends FileOutputFormat<NullWritable,BytesWritable> {
+  static class RawBytesOutputFormat<K>
+      extends FileOutputFormat<K,GridmixRecord> {
 
     @Override
-    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+    public RecordWriter<K,GridmixRecord> getRecordWriter(
         TaskAttemptContext job) throws IOException {
 
       Path file = getDefaultWorkFile(job, "");
       FileSystem fs = file.getFileSystem(job.getConfiguration());
       final FSDataOutputStream fileOut = fs.create(file, false);
-      return new RecordWriter<NullWritable,BytesWritable>() {
+      return new RecordWriter<K,GridmixRecord>() {
         @Override
-        public void write(NullWritable key, BytesWritable value)
+        public void write(K ignored, GridmixRecord value)
             throws IOException {
-          //LOG.trace("WROTE " + value.getLength() + " bytes");
-          fileOut.write(value.getBytes(), 0, value.getLength());
+          value.writeRandom(fileOut, value.getSize());
         }
         @Override
         public void close(TaskAttemptContext ctxt) throws IOException {
@@ -829,8 +505,10 @@
           jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
         specBytes[j] = info.getOutputBytes();
         specRecords[j] = info.getOutputRecords();
-        LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
-            i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+              i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+        }
       }
       final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
       splits.add(new GridmixSplit(striper.splitFor(inputDir,
@@ -842,77 +520,4 @@
     pushDescription(id(), splits);
   }
 
-  static class InputStriper {
-    int idx;
-    long currentStart;
-    FileStatus current;
-    final List<FileStatus> files = new ArrayList<FileStatus>();
-
-    InputStriper(FilePool inputDir, long mapBytes)
-        throws IOException {
-      final long inputBytes = inputDir.getInputFiles(mapBytes, files);
-      if (mapBytes > inputBytes) {
-        LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
-      }
-      current = files.get(0);
-    }
-
-    CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
-        throws IOException {
-      final ArrayList<Path> paths = new ArrayList<Path>();
-      final ArrayList<Long> start = new ArrayList<Long>();
-      final ArrayList<Long> length = new ArrayList<Long>();
-      final HashMap<String,Double> sb = new HashMap<String,Double>();
-      while (bytes > 0) {
-        paths.add(current.getPath());
-        start.add(currentStart);
-        final long fromFile = Math.min(bytes, current.getLen() - currentStart);
-        length.add(fromFile);
-        for (BlockLocation loc :
-            inputDir.locationsFor(current, currentStart, fromFile)) {
-          final double tedium = loc.getLength() / (1.0 * bytes);
-          for (String l : loc.getHosts()) {
-            Double j = sb.get(l);
-            if (null == j) {
-              sb.put(l, tedium);
-            } else {
-              sb.put(l, j.doubleValue() + tedium);
-            }
-          }
-        }
-        currentStart += fromFile;
-        bytes -= fromFile;
-        if (current.getLen() - currentStart == 0) {
-          current = files.get(++idx % files.size());
-          currentStart = 0;
-        }
-      }
-      final ArrayList<Entry<String,Double>> sort =
-        new ArrayList<Entry<String,Double>>(sb.entrySet());
-      Collections.sort(sort, hostRank);
-      final String[] hosts = new String[Math.min(nLocs, sort.size())];
-      for (int i = 0; i < nLocs && i < sort.size(); ++i) {
-        hosts[i] = sort.get(i).getKey();
-      }
-      return new CombineFileSplit(paths.toArray(new Path[0]),
-          toLongArray(start), toLongArray(length), hosts);
-    }
-
-    private long[] toLongArray(final ArrayList<Long> sigh) {
-      final long[] ret = new long[sigh.size()];
-      for (int i = 0; i < ret.length; ++i) {
-        ret[i] = sigh.get(i);
-      }
-      return ret;
-    }
-
-    final Comparator<Entry<String,Double>> hostRank =
-      new Comparator<Entry<String,Double>>() {
-        public int compare(Entry<String,Double> a, Entry<String,Double> b) {
-            final double va = a.getValue();
-            final double vb = b.getValue();
-            return va > vb ? -1 : va < vb ? 1 : 0;
-          }
-      };
-  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Tue Jan 26 14:02:53 2010
@@ -26,9 +26,16 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.ZombieJobProducer;
 
 import org.apache.commons.logging.Log;
@@ -47,6 +54,7 @@
   public static final Log LOG = LogFactory.getLog(JobFactory.class);
 
   private final Path scratch;
+  private final float rateFactor;
   private final Configuration conf;
   private final ReaderThread rThread;
   private final AtomicInteger sequence;
@@ -83,6 +91,7 @@
       Path scratch, Configuration conf, CountDownLatch startFlag) {
     sequence = new AtomicInteger(0);
     this.scratch = scratch;
+    this.rateFactor = conf.getFloat(Gridmix.GRIDMIX_SUB_MUL, 1.0f);
     this.jobProducer = jobProducer;
     this.conf = new Configuration(conf);
     this.submitter = submitter;
@@ -90,6 +99,61 @@
     this.rThread = new ReaderThread();
   }
 
+  static class MinTaskInfo extends TaskInfo {
+    public MinTaskInfo(TaskInfo info) {
+      super(info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            info.getTaskMemory());
+    }
+    public long getInputBytes() {
+      return Math.max(0, super.getInputBytes());
+    }
+    public int getInputRecords() {
+      return Math.max(0, super.getInputRecords());
+    }
+    public long getOutputBytes() {
+      return Math.max(0, super.getOutputBytes());
+    }
+    public int getOutputRecords() {
+      return Math.max(0, super.getOutputRecords());
+    }
+    public long getTaskMemory() {
+      return Math.max(0, super.getTaskMemory());
+    }
+  }
+
+  static class FilterJobStory implements JobStory {
+
+    protected final JobStory job;
+
+    public FilterJobStory(JobStory job) {
+      this.job = job;
+    }
+    public JobConf getJobConf() { return job.getJobConf(); }
+    public String getName() { return job.getName(); }
+    public JobID getJobID() { return job.getJobID(); }
+    public String getUser() { return job.getUser(); }
+    public long getSubmissionTime() { return job.getSubmissionTime(); }
+    public InputSplit[] getInputSplits() { return job.getInputSplits(); }
+    public int getNumberMaps() { return job.getNumberMaps(); }
+    public int getNumberReduces() { return job.getNumberReduces(); }
+    public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+      return job.getTaskInfo(taskType, taskNumber);
+    }
+    public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+        int taskAttemptNumber) {
+      return job.getTaskAttemptInfo(taskType, taskNumber, taskAttemptNumber);
+    }
+    public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(
+        int taskNumber, int taskAttemptNumber, int locality) {
+      return job.getMapTaskAttemptInfoAdjusted(
+          taskNumber, taskAttemptNumber, locality);
+    }
+    public Values getOutcome() {
+      return job.getOutcome();
+    }
+  }
+
   /**
    * Worker thread responsible for reading descriptions, assigning sequence
    * numbers, and normalizing time.
@@ -107,7 +171,12 @@
       } while (job != null
           && (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
               job.getSubmissionTime() < 0));
-      return job;
+      return null == job ? null : new FilterJobStory(job) {
+          @Override
+          public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+            return new MinTaskInfo(this.job.getTaskInfo(taskType, taskNumber));
+          }
+        };
     }
 
     @Override
@@ -133,11 +202,12 @@
             }
             final long current = job.getSubmissionTime();
             if (current < last) {
-              throw new IOException(
-                  "JobStories are not ordered by submission time.");
+              LOG.warn("Job " + job.getJobID() + " out of order");
+              continue;
             }
             last = current;
-            submitter.add(new GridmixJob(conf, initTime + (current - first),
+            submitter.add(new GridmixJob(conf, initTime +
+                  Math.round(rateFactor * (current - first)),
                 job, scratch, sequence.getAndIncrement()));
           } catch (IOException e) {
             JobFactory.this.error = e;
@@ -179,8 +249,8 @@
   /**
    * Wait for the reader thread to exhaust the job trace.
    */
-  public void join() throws InterruptedException {
-    rThread.join();
+  public void join(long millis) throws InterruptedException {
+    rThread.join(millis);
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java Tue Jan 26 14:02:53 2010
@@ -47,7 +47,8 @@
   private final MonitorThread mThread;
   private final BlockingQueue<Job> runningJobs;
   private final long pollDelayMillis;
-  private volatile boolean graceful = false;
+  private boolean graceful = false;
+  private boolean shutdown = false;
 
   /**
    * Create a JobMonitor with a default polling interval of 5s.
@@ -59,7 +60,7 @@
   /**
    * Create a JobMonitor that sleeps for the specified duration after
    * polling a still-running job.
-   * @param pollDelaySec Delay after polling a running job
+   * @param pollDelay Delay after polling a running job
    * @param unit Time unit for pollDelaySec (rounded to milliseconds)
    */
   public JobMonitor(int pollDelay, TimeUnit unit) {
@@ -80,14 +81,14 @@
    * Temporary hook for recording job success.
    */
   protected void onSuccess(Job job) {
-    LOG.info(job.getJobName() + " succeeded");
+    LOG.info(job.getJobName() + " (" + job.getID() + ")" + " success");
   }
 
   /**
    * Temporary hook for recording job failure.
    */
   protected void onFailure(Job job) {
-    LOG.info(job.getJobName() + " failed");
+    LOG.info(job.getJobName() + " (" + job.getID() + ")" + " failure");
   }
 
   /**
@@ -128,20 +129,30 @@
 
     @Override
     public void run() {
-      boolean interrupted = false;
+      boolean graceful;
+      boolean shutdown;
       while (true) {
         try {
           synchronized (mJobs) {
+            graceful = JobMonitor.this.graceful;
+            shutdown = JobMonitor.this.shutdown;
             runningJobs.drainTo(mJobs);
           }
 
-          final boolean graceful = JobMonitor.this.graceful;
           // shutdown conditions; either shutdown requested and all jobs
           // have completed or abort requested and there are recently
-          // submitted jobs not yet accounted for
-          if (interrupted && ((!graceful && runningJobs.isEmpty()) ||
-                               (graceful && mJobs.isEmpty()))) {
-            break;
+          // submitted jobs not in the monitored set
+          if (shutdown) {
+            if (!graceful) {
+              while (!runningJobs.isEmpty()) {
+                synchronized (mJobs) {
+                  runningJobs.drainTo(mJobs);
+                }
+              }
+              break;
+            } else if (mJobs.isEmpty()) {
+              break;
+            }
           }
           while (!mJobs.isEmpty()) {
             Job job;
@@ -161,14 +172,16 @@
                 // reset it here
                 Thread.currentThread().interrupt();
               } else {
-                LOG.warn("Lost job " + job.getJobName(), e);
+                LOG.warn("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName()), e);
                 continue;
               }
             }
             synchronized (mJobs) {
               if (!mJobs.offer(job)) {
-                LOG.error("Lost job " + job.getJobName()); // should never
-                                                           // happen
+                LOG.error("Lost job " + (null == job.getJobName()
+                     ? "<unknown>" : job.getJobName())); // should never
+                                                         // happen
               }
             }
             break;
@@ -176,7 +189,7 @@
           try {
             TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
           } catch (InterruptedException e) {
-            interrupted = true;
+            shutdown = true;
             continue;
           }
         } catch (Throwable e) {
@@ -198,8 +211,8 @@
    * called. Note that, since submission may be sporatic, this will hang
    * if no form of shutdown has been requested.
    */
-  public void join() throws InterruptedException {
-    mThread.join();
+  public void join(long millis) throws InterruptedException {
+    mThread.join(millis);
   }
 
   /**
@@ -207,7 +220,10 @@
    * Upstream submitter is assumed dead.
    */
   public void abort() {
-    graceful = false;
+    synchronized (mJobs) {
+      graceful = false;
+      shutdown = true;
+    }
     mThread.interrupt();
   }
 
@@ -216,7 +232,10 @@
    * Upstream submitter is assumed dead.
    */
   public void shutdown() {
-    graceful = true;
+    synchronized (mJobs) {
+      graceful = true;
+      shutdown = true;
+    }
     mThread.interrupt();
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Tue Jan 26 14:02:53 2010
@@ -18,6 +18,7 @@
 package org.apache.hadoop.mapred.gridmix;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -96,6 +97,10 @@
               " (" + job.getJob().getID() + ")");
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
+          if (e.getCause() instanceof ClosedByInterruptException) {
+            throw new InterruptedException("Failed to submit " +
+                job.getJob().getJobName());
+          }
         } catch (ClassNotFoundException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
         }
@@ -144,11 +149,11 @@
    * Continue running until all queued jobs have been submitted to the
    * cluster.
    */
-  public void join() throws InterruptedException {
+  public void join(long millis) throws InterruptedException {
     if (!shutdown) {
       throw new IllegalStateException("Cannot wait for active submit thread");
     }
-    sched.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    sched.awaitTermination(millis, TimeUnit.MILLISECONDS);
   }
 
   /**

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Tue Jan 26 14:02:53 2010
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -27,7 +28,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -79,21 +79,58 @@
     public void close() { }
   }
 
+  static double[] getDistr(Random r, double mindist, int size) {
+    assert 0.0 <= mindist && mindist <= 1.0;
+    final double min = mindist / size;
+    final double rem = 1.0 - min * size;
+    final double[] tmp = new double[size];
+    for (int i = 0; i < tmp.length - 1; ++i) {
+      tmp[i] = r.nextDouble() * rem;
+    }
+    tmp[tmp.length - 1] = rem;
+    Arrays.sort(tmp);
+
+    final double[] ret = new double[size];
+    ret[0] = tmp[0] + min;
+    for (int i = 1; i < size; ++i) {
+      ret[i] = tmp[i] - tmp[i-1] + min;
+    }
+    return ret;
+  }
+
   /**
    * Generate random task data for a synthetic job.
    */
   static class MockJob implements JobStory {
 
-    public static final String MIN_BYTES_IN =   "gridmix.test.min.bytes.in";
-    public static final String VAR_BYTES_IN =   "gridmix.test.var.bytes.in";
-    public static final String MIN_BYTES_OUT =  "gridmix.test.min.bytes.out";
-    public static final String VAR_BYTES_OUT =  "gridmix.test.var.bytes.out";
-
-    public static final String MIN_REC_SIZE =   "gridmix.test.min.rec.bytes";
-    public static final String VAR_REC_SIZE =   "gridmix.test.var.rec.bytes";
-
-    public static final String MAX_MAPS =       "gridmix.test.max.maps";
-    public static final String MAX_REDS =       "gridmix.test.max.reduces";
+    static final int MIN_REC = 1 << 14;
+    static final int MIN_BYTES = 1 << 20;
+    static final int VAR_REC = 1 << 14;
+    static final int VAR_BYTES = 4 << 20;
+    static final int MAX_MAP = 5;
+    static final int MAX_RED = 3;
+
+    static void initDist(Random r, double min, int[] recs, long[] bytes,
+        long tot_recs, long tot_bytes) {
+      final double[] recs_dist = getDistr(r, min, recs.length);
+      final double[] bytes_dist = getDistr(r, min, recs.length);
+      long totalbytes = 0L;
+      int totalrecs = 0;
+      for (int i = 0; i < recs.length; ++i) {
+        recs[i] = (int) Math.round(tot_recs * recs_dist[i]);
+        bytes[i] = Math.round(tot_bytes * bytes_dist[i]);
+        totalrecs += recs[i];
+        totalbytes += bytes[i];
+      }
+      // Add/remove excess
+      recs[0] += totalrecs - tot_recs;
+      bytes[0] += totalbytes - tot_bytes;
+      if (LOG.isInfoEnabled()) {
+        LOG.info("DIST: " + Arrays.toString(recs) + " " +
+            tot_recs + "/" + totalrecs + " " +
+            Arrays.toString(bytes) + " " + tot_bytes + "/" + totalbytes);
+      }
+    }
 
     private static final AtomicInteger seq = new AtomicInteger(0);
     // set timestamps in the past
@@ -101,97 +138,65 @@
       new AtomicLong(System.currentTimeMillis() -
         TimeUnit.MILLISECONDS.convert(60, TimeUnit.DAYS));
 
+    private final int id;
     private final String name;
     private final int[] m_recsIn, m_recsOut, r_recsIn, r_recsOut;
     private final long[] m_bytesIn, m_bytesOut, r_bytesIn, r_bytesOut;
     private final long submitTime;
 
-    public MockJob() {
-      this(new Configuration(false));
-    }
-
     public MockJob(Configuration conf) {
-      this(conf.getInt(MIN_BYTES_IN, 1 << 20),
-           conf.getInt(VAR_BYTES_IN, 5 << 20),
-           conf.getInt(MIN_BYTES_OUT, 1 << 20),
-           conf.getInt(VAR_BYTES_OUT, 5 << 20),
-           conf.getInt(MIN_REC_SIZE , 100),
-           conf.getInt(VAR_REC_SIZE , 1 << 15),
-           conf.getInt(MAX_MAPS, 5),
-           conf.getInt(MAX_REDS, 3));
-    }
-
-    public MockJob(int min_bytes_in, int var_bytes_in,
-                   int min_bytes_out, int var_bytes_out,
-                   int min_rec_size, int var_rec_size,
-                   int max_maps, int max_reds) {
       final Random r = new Random();
-      name = String.format("MOCKJOB%05d", seq.getAndIncrement());
+      final long seed = r.nextLong();
+      r.setSeed(seed);
+      id = seq.getAndIncrement();
+      name = String.format("MOCKJOB%05d", id);
+      LOG.info(name + " (" + seed + ")");
       submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert(
             r.nextInt(10), TimeUnit.SECONDS));
-      int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
-      int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
-
-      final int iAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
-      final int oAvgMapRec = r.nextInt(var_rec_size) + min_rec_size;
-
-      // MAP
-      m_bytesIn = new long[r.nextInt(max_maps) + 1];
-      m_bytesOut = new long[m_bytesIn.length];
-      m_recsIn = new int[m_bytesIn.length];
-      m_recsOut = new int[m_bytesIn.length];
-      for (int i = 0; i < m_bytesIn.length; ++i) {
-        m_bytesIn[i] = r.nextInt(var_bytes_in) + min_bytes_in;
-        iMapBTotal += m_bytesIn[i];
-        m_recsIn[i] = (int)(m_bytesIn[i] / iAvgMapRec);
-        iMapRTotal += m_recsIn[i];
-
-        m_bytesOut[i] = r.nextInt(var_bytes_out) + min_bytes_out;
-        oMapBTotal += m_bytesOut[i];
-        m_recsOut[i] = (int)(m_bytesOut[i] / oAvgMapRec);
-        oMapRTotal += m_recsOut[i];
-      }
-
-      // REDUCE
-      r_bytesIn = new long[r.nextInt(max_reds) + 1];
-      r_bytesOut = new long[r_bytesIn.length];
-      r_recsIn = new int[r_bytesIn.length];
-      r_recsOut = new int[r_bytesIn.length];
-      iRedBTotal = oMapBTotal;
-      iRedRTotal = oMapRTotal;
-      for (int j = 0; iRedBTotal > 0; ++j) {
-        int i = j % r_bytesIn.length;
-        final int inc = r.nextInt(var_bytes_out) + min_bytes_out;
-        r_bytesIn[i] += inc;
-        iRedBTotal -= inc;
-        if (iRedBTotal < 0) {
-          r_bytesIn[i] += iRedBTotal;
-          iRedBTotal = 0;
-        }
-        iRedRTotal += r_recsIn[i];
-        r_recsIn[i] = (int)(r_bytesIn[i] / oAvgMapRec);
-        iRedRTotal -= r_recsIn[i];
-        if (iRedRTotal < 0) {
-          r_recsIn[i] += iRedRTotal;
-          iRedRTotal = 0;
-        }
 
-        r_bytesOut[i] = r.nextInt(var_bytes_in) + min_bytes_in;
-        oRedBTotal += r_bytesOut[i];
-        r_recsOut[i] = (int)(r_bytesOut[i] / iAvgMapRec);
-        oRedRTotal += r_recsOut[i];
-      }
-      r_recsIn[0] += iRedRTotal;
+      m_recsIn = new int[r.nextInt(MAX_MAP) + 1];
+      m_bytesIn = new long[m_recsIn.length];
+      m_recsOut = new int[m_recsIn.length];
+      m_bytesOut = new long[m_recsIn.length];
+
+      r_recsIn = new int[r.nextInt(MAX_RED) + 1];
+      r_bytesIn = new long[r_recsIn.length];
+      r_recsOut = new int[r_recsIn.length];
+      r_bytesOut = new long[r_recsIn.length];
+
+      // map input
+      final long map_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long map_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.5, m_recsIn, m_bytesIn, map_recs, map_bytes);
+
+      // shuffle
+      final long shuffle_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long shuffle_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, m_recsOut, m_bytesOut, shuffle_recs, shuffle_bytes);
+      initDist(r, 0.8, r_recsIn, r_bytesIn, shuffle_recs, shuffle_bytes);
+
+      // reduce output
+      final long red_recs = r.nextInt(VAR_REC) + MIN_REC;
+      final long red_bytes = r.nextInt(VAR_BYTES) + MIN_BYTES;
+      initDist(r, 0.4, r_recsOut, r_bytesOut, red_recs, red_bytes);
 
       if (LOG.isDebugEnabled()) {
-        iRedRTotal = 0;
-        iRedBTotal = 0;
-        for (int i = 0; i < r_bytesIn.length; ++i) {
+        int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0;
+        int iMapRTotal = 0, oMapRTotal = 0, iRedRTotal = 0, oRedRTotal = 0;
+        for (int i = 0; i < m_recsIn.length; ++i) {
+          iMapRTotal += m_recsIn[i];
+          iMapBTotal += m_bytesIn[i];
+          oMapRTotal += m_recsOut[i];
+          oMapBTotal += m_bytesOut[i];
+        }
+        for (int i = 0; i < r_recsIn.length; ++i) {
           iRedRTotal += r_recsIn[i];
           iRedBTotal += r_bytesIn[i];
+          oRedRTotal += r_recsOut[i];
+          oRedBTotal += r_bytesOut[i];
         }
         LOG.debug(String.format("%s: M (%03d) %6d/%10d -> %6d/%10d" +
-                             " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
+                                   " R (%03d) %6d/%10d -> %6d/%10d @%d", name,
             m_bytesIn.length, iMapRTotal, iMapBTotal, oMapRTotal, oMapBTotal,
             r_bytesIn.length, iRedRTotal, iRedBTotal, oRedRTotal, oRedBTotal,
             submitTime));
@@ -210,7 +215,7 @@
 
     @Override
     public JobID getJobID() {
-      return null;
+      return new JobID("job_mock_" + name, id);
     }
 
     @Override

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Tue Jan 26 14:02:53 2010
@@ -20,9 +20,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -35,16 +33,14 @@
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.TaskInfo;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import static org.apache.hadoop.mapreduce.TaskCounter.*;
 
@@ -96,7 +92,7 @@
 
   static class TestMonitor extends JobMonitor {
 
-    static final long SLOPBYTES = 5 * 1024;
+    static final long SLOPBYTES = 1024;
     private final int expected;
     private final BlockingQueue<Job> retiredJobs;
 
@@ -138,12 +134,12 @@
         final TaskReport[] mReports = job.getTaskReports(TaskType.MAP);
         assertEquals("Mismatched map count", nMaps, mReports.length);
         check(TaskType.MAP, job, spec, mReports,
-            0, 1, nReds * SLOPBYTES, nReds + 1);
+            0, 0, SLOPBYTES, nReds);
 
         final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE);
         assertEquals("Mismatched reduce count", nReds, rReports.length);
         check(TaskType.REDUCE, job, spec, rReports,
-            nMaps * SLOPBYTES, nMaps + 1, 0, 1);
+            nMaps * SLOPBYTES, 2 * nMaps, 0, 0);
       }
     }
 
@@ -167,7 +163,8 @@
         switch (type) {
           case MAP:
              runInputBytes[i] = counters.findCounter("FileSystemCounters",
-                 "HDFS_BYTES_READ").getValue();
+                 "HDFS_BYTES_READ").getValue() - 
+                 counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getValue();
              runInputRecords[i] =
                (int)counters.findCounter(MAP_INPUT_RECORDS).getValue();
              runOutputBytes[i] =
@@ -176,74 +173,97 @@
                (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue();
 
             specInfo = spec.getTaskInfo(TaskType.MAP, i);
+            specInputRecords[i] = specInfo.getInputRecords();
+            specInputBytes[i] = specInfo.getInputBytes();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
+                 specInputBytes[i], specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
+                 runInputBytes[i], runOutputBytes[i],
+                 runInputRecords[i], runOutputRecords[i]);
             break;
           case REDUCE:
-             runInputBytes[i] =
-               counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue();
-             runInputRecords[i] =
-               (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
-             runOutputBytes[i] =
-               counters.findCounter("FileSystemCounters",
-                   "HDFS_BYTES_WRITTEN").getValue();
-             runOutputRecords[i] =
-               (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+            runInputBytes[i] = 0;
+            runInputRecords[i] =
+              (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue();
+            runOutputBytes[i] =
+              counters.findCounter("FileSystemCounters",
+                  "HDFS_BYTES_WRITTEN").getValue();
+            runOutputRecords[i] =
+              (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue();
+
 
             specInfo = spec.getTaskInfo(TaskType.REDUCE, i);
+            // There is no reliable counter for reduce input bytes. The
+            // variable-length encoding of intermediate records and other noise
+            // make this quantity difficult to estimate. The shuffle and spec
+            // input bytes are included in debug output for reference, but are
+            // not checked
+            specInputBytes[i] = 0;
+            specInputRecords[i] = specInfo.getInputRecords();
+            specOutputRecords[i] = specInfo.getOutputRecords();
+            specOutputBytes[i] = specInfo.getOutputBytes();
+            System.out.printf(type + " SPEC: (%9d) -> %9d :: %5d -> %5d\n",
+                 specInfo.getInputBytes(), specOutputBytes[i],
+                 specInputRecords[i], specOutputRecords[i]);
+            System.out.printf(type + " RUN:  (%9d) -> %9d :: %5d -> %5d\n",
+                 counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(),
+                 runOutputBytes[i], runInputRecords[i], runOutputRecords[i]);
             break;
           default:
             specInfo = null;
             fail("Unexpected type: " + type);
         }
-        specInputBytes[i] = specInfo.getInputBytes();
-        specInputRecords[i] = specInfo.getInputRecords();
-        specOutputRecords[i] = specInfo.getOutputRecords();
-        specOutputBytes[i] = specInfo.getOutputBytes();
-        System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n",
-             specInputBytes[i], specOutputBytes[i],
-             specInputRecords[i], specOutputRecords[i]);
-        System.out.printf(type + " RUN:  %9d -> %9d :: %5d -> %5d\n",
-             runInputBytes[i], runOutputBytes[i],
-             runInputRecords[i], runOutputRecords[i]);
       }
 
       // Check input bytes
       Arrays.sort(specInputBytes);
       Arrays.sort(runInputBytes);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched input bytes " +
+        assertTrue("Mismatched " + type + " input bytes " +
             specInputBytes[i] + "/" + runInputBytes[i],
-            runInputBytes[i] - specInputBytes[i] <= extraInputBytes);
+            eqPlusMinus(runInputBytes[i], specInputBytes[i], extraInputBytes));
       }
 
       // Check input records
       Arrays.sort(specInputRecords);
       Arrays.sort(runInputRecords);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched input records " +
+        assertTrue("Mismatched " + type + " input records " +
             specInputRecords[i] + "/" + runInputRecords[i],
-            runInputRecords[i] - specInputRecords[i] <= extraInputRecords);
+            eqPlusMinus(runInputRecords[i], specInputRecords[i],
+              extraInputRecords));
       }
 
       // Check output bytes
       Arrays.sort(specOutputBytes);
       Arrays.sort(runOutputBytes);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched output bytes " +
+        assertTrue("Mismatched " + type + " output bytes " +
             specOutputBytes[i] + "/" + runOutputBytes[i],
-            runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes);
+            eqPlusMinus(runOutputBytes[i], specOutputBytes[i],
+              extraOutputBytes));
       }
 
       // Check output records
       Arrays.sort(specOutputRecords);
       Arrays.sort(runOutputRecords);
       for (int i = 0; i < runTasks.length; ++i) {
-        assertTrue("Mismatched output records " +
+        assertTrue("Mismatched " + type + " output records " +
             specOutputRecords[i] + "/" + runOutputRecords[i],
-            runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords);
+            eqPlusMinus(runOutputRecords[i], specOutputRecords[i],
+              extraOutputRecords));
       }
 
     }
 
+    private static boolean eqPlusMinus(long a, long b, long x) {
+      final long diff = Math.abs(a - b);
+      return diff <= x;
+    }
+
     @Override
     protected void onSuccess(Job job) {
       retiredJobs.add(job);
@@ -292,6 +312,7 @@
     };
     DebugGridmix client = new DebugGridmix();
     final Configuration conf = mrCluster.createJobConf();
+    //conf.setInt(Gridmix.GRIDMIX_KEY_LEN, 2);
     int res = ToolRunner.run(conf, client, argv);
     assertEquals("Client exited with nonzero status", 0, res);
     client.checkMonitor();

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
 /hadoop/core/trunk/src/contrib/index:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/index:804974-885774
+/hadoop/mapreduce/trunk/src/contrib/index:804974-903221

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy.xml Tue Jan 26 14:02:53 2010
@@ -1,4 +1,21 @@
 <?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
 <ivy-module version="1.0">
   <info organisation="org.apache.hadoop" module="${ant.project.name}">
     <license name="Apache 2.0"/>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy/libraries.properties?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy/libraries.properties (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/index/ivy/libraries.properties Tue Jan 26 14:02:53 2010
@@ -1,3 +1,15 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
 #This properties file lists the versions of the various artifacts used by index.
 
 #These are the versions of our dependencies (in alphabetical order)

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 26 14:02:53 2010
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
 /hadoop/core/trunk/src/contrib/mrunit:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/mrunit:804974-885774
+/hadoop/mapreduce/trunk/src/contrib/mrunit:804974-903221

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mrunit/ivy.xml Tue Jan 26 14:02:53 2010
@@ -33,6 +33,7 @@
 
     <conf name="common" visibility="private" 
       description="artifacts needed to compile/test the application"/>
+    <conf name="test" visibility="private" extends="runtime"/>
   </configurations>
 
   <publications>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java?rev=903227&r1=903226&r2=903227&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java Tue Jan 26 14:02:53 2010
@@ -20,7 +20,9 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -29,14 +31,15 @@
 import org.apache.hadoop.mapred.SimulatorEvent;
 import org.apache.hadoop.mapred.SimulatorEventQueue;
 import org.apache.hadoop.mapred.JobCompleteEvent;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.SimulatorJobClient;
 import org.apache.hadoop.mapred.SimulatorJobTracker;
 import org.apache.hadoop.mapred.SimulatorTaskTracker;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.tools.rumen.ClusterStory;
+import org.apache.hadoop.tools.rumen.ClusterTopologyReader;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.LoggedNetworkTopology;
 import org.apache.hadoop.tools.rumen.MachineNode;
 import org.apache.hadoop.tools.rumen.RackNode;
 import org.apache.hadoop.tools.rumen.ZombieCluster;
@@ -78,8 +81,6 @@
 
     for (MachineNode node : clusterStory.getMachines()) {
       String hostname = node.getName();
-      RackNode rackNode = node.getRackNode();
-      StaticMapping.addNodeToRack(hostname, rackNode.getName());
       String taskTrackerName = "tracker_" + hostname + ":localhost/127.0.0.1:"
           + port;
       port++;
@@ -132,13 +133,23 @@
 
     MachineNode defaultNode = new MachineNode.Builder("default", 2)
         .setMapSlots(maxMaps).setReduceSlots(maxReduces).build();
-    ZombieCluster cluster = new ZombieCluster(new Path(topologyFile), 
-        defaultNode, jobConf);
+    
+    LoggedNetworkTopology topology = new ClusterTopologyReader(new Path(
+        topologyFile), jobConf).get();
+    // Setting the static mapping before removing numeric IP hosts.
+    setStaticMapping(topology);
+    if (getConf().getBoolean("mumak.topology.filter-numeric-ips", true)) {
+      removeIpHosts(topology);
+    }
+    ZombieCluster cluster = new ZombieCluster(topology, defaultNode);
     long firstJobStartTime = now + 60000;
     JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer(
         new Path(traceFile), cluster, firstJobStartTime, jobConf);
     
-    jc = new SimulatorJobClient(jt, jobStoryProducer);
+    final SimulatorJobSubmissionPolicy submissionPolicy = SimulatorJobSubmissionPolicy
+        .getPolicy(jobConf);
+    
+    jc = new SimulatorJobClient(jt, jobStoryProducer, submissionPolicy);
     queue.addAll(jc.init(firstJobStartTime));
 
     // create TTs based on topology.json     
@@ -236,4 +247,58 @@
   long getCurrentTime() {
     return currentTime;
   }
+  
+  // Due to HDFS-778, a node may appear in job history logs as both numeric
+  // ips and as host names. We remove them from the parsed network topology
+  // before feeding it to ZombieCluster.
+  static void removeIpHosts(LoggedNetworkTopology topology) {
+    for (Iterator<LoggedNetworkTopology> rackIt = topology.getChildren()
+        .iterator(); rackIt.hasNext();) {
+      LoggedNetworkTopology rack = rackIt.next();
+      List<LoggedNetworkTopology> nodes = rack.getChildren();
+      for (Iterator<LoggedNetworkTopology> it = nodes.iterator(); it.hasNext();) {
+        LoggedNetworkTopology node = it.next();
+        if (isIPAddress(node.getName())) {
+          it.remove();
+        }
+      }
+      if (nodes.isEmpty()) {
+        rackIt.remove();
+      }
+    }
+  }
+
+  static Pattern IP_PATTERN;
+  
+  static {
+    // 0-255
+    String IPV4BK1 = "(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)";
+    // .b.c.d - where b/c/d are 0-255, and optionally adding two more
+    // backslashes before each period
+    String IPV4BKN = "(?:\\\\?\\." + IPV4BK1 + "){3}";
+    String IPV4_PATTERN = IPV4BK1 + IPV4BKN;
+    
+    // first hexadecimal number
+    String IPV6BK1 = "(?:[0-9a-fA-F]{1,4})";
+    // remaining 7 hexadecimal numbers, each preceded with ":".
+    String IPV6BKN = "(?::" + IPV6BK1 + "){7}";
+    String IPV6_PATTERN = IPV6BK1 + IPV6BKN;
+
+    IP_PATTERN = Pattern.compile(
+        "^(?:" + IPV4_PATTERN + "|" + IPV6_PATTERN + ")$");
+  }
+
+ 
+  static boolean isIPAddress(String hostname) {
+    return IP_PATTERN.matcher(hostname).matches();
+  }
+  
+  static void setStaticMapping(LoggedNetworkTopology topology) {
+    for (LoggedNetworkTopology rack : topology.getChildren()) {
+      for (LoggedNetworkTopology node : rack.getChildren()) {
+        StaticMapping.addNodeToRack(node.getName(), 
+            new RackNode(rack.getName(), 1).getName());
+      }
+    }
+  }
 }



Mime
View raw message