hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r897072 - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/
Date Fri, 08 Jan 2010 01:08:01 GMT
Author: cdouglas
Date: Fri Jan  8 01:08:01 2010
New Revision: 897072

URL: http://svn.apache.org/viewvc?rev=897072&view=rev
Log:
MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
traces. Contributed by Dick King

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
  (with props)
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
  (with props)
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Folder.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=897072&r1=897071&r2=897072&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jan  8 01:08:01 2010
@@ -100,6 +100,9 @@
     MAPREDUCE-372. Change org.apache.hadoop.mapred.lib.ChainMapper/Reducer 
     to use new mapreduce api. (Amareshwari Sriramadasu via sharad)
 
+    MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
+    traces. (Dick King via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java?rev=897072&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenFolder.java
Fri Jan  8 01:08:01 2010
@@ -0,0 +1,104 @@
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestRumenFolder {
+  @Test
+  public void testFoldingSmallTrace() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    @SuppressWarnings("deprecation")
+    final Path rootInputDir =
+        new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs);
+    @SuppressWarnings("deprecation")
+    final Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"))
+            .makeQualified(lfs);
+
+    final Path rootInputFile = new Path(rootInputDir, "rumen/small-trace-test");
+    final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
+    lfs.delete(tempDir, true);
+
+    final Path foldedTracePath = new Path(tempDir, "folded-trace.json");
+
+    final Path inputFile =
+        new Path(rootInputFile, "folder-input-trace.json.gz");
+
+    System.out.println("folded trace result path = " + foldedTracePath);
+
+    String[] args =
+        { "-input-cycle", "100S", "-output-duration", "300S",
+            "-skew-buffer-length", "1", "-seed", "100", "-concentration", "2",
+            inputFile.toString(), foldedTracePath.toString() };
+
+    final Path foldedGoldFile =
+        new Path(rootInputFile, "goldFoldedTrace.json.gz");
+
+    Folder folder = new Folder();
+    int result = ToolRunner.run(folder, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenFolder.<LoggedJob> jsonFileMatchesGold(conf, lfs, foldedTracePath,
+        foldedGoldFile, LoggedJob.class, "trace");
+  }
+
+  static private <T extends DeepCompare> void jsonFileMatchesGold(
+      Configuration conf, FileSystem lfs, Path result, Path gold,
+      Class<? extends T> clazz, String fileDescription) throws IOException {
+    JsonObjectMapperParser<T> goldParser =
+        new JsonObjectMapperParser<T>(gold, clazz, conf);
+    InputStream resultStream = lfs.open(result);
+    JsonObjectMapperParser<T> resultParser =
+        new JsonObjectMapperParser<T>(resultStream, clazz);
+    try {
+      while (true) {
+        DeepCompare goldJob = goldParser.getNext();
+        DeepCompare resultJob = resultParser.getNext();
+        if ((goldJob == null) || (resultJob == null)) {
+          assertTrue(goldJob == resultJob);
+          break;
+        }
+
+        try {
+          resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
+        } catch (DeepInequalityException e) {
+          String error = e.path.toString();
+
+          assertFalse(fileDescription + " mismatches: " + error, true);
+        }
+      }
+    } finally {
+      IOUtils.cleanup(null, goldParser, resultParser);
+    }
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz?rev=897072&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/folder-input-trace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz?rev=897072&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/test/tools/data/rumen/small-trace-test/goldFoldedTrace.json.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java?rev=897072&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
(added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
Fri Jan  8 01:08:01 2010
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DeskewedJobTraceReader implements Closeable {
+  // underlying engine
+  private final JobTraceReader reader;
+
+  // configuration variables
+  private final int skewBufferLength;
+
+  private final boolean abortOnUnfixableSkew;
+
+  // state variables
+  private long skewMeasurementLatestSubmitTime = Long.MIN_VALUE;
+
+  private long returnedLatestSubmitTime = Long.MIN_VALUE;
+
+  private int maxSkewBufferNeeded = 0;
+
+  // a submit time will NOT be in countedRepeatedSubmitTimesSoFar if
+  // it only occurs once. This situation is represented by having the
+  // time in submitTimesSoFar only. A submit time that occurs twice or more
+  // appears in countedRepeatedSubmitTimesSoFar [with the appropriate range
+  // value] AND submitTimesSoFar
+  private TreeMap<Long, Integer> countedRepeatedSubmitTimesSoFar =
+      new TreeMap<Long, Integer>();
+  private TreeSet<Long> submitTimesSoFar = new TreeSet<Long>();
+
+  private final PriorityQueue<LoggedJob> skewBuffer;
+
+  static final private Log LOG =
+      LogFactory.getLog(DeskewedJobTraceReader.class);
+
+  static private class JobComparator implements Comparator<LoggedJob> {
+    @Override
+    public int compare(LoggedJob j1, LoggedJob j2) {
+      return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
+          .getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
+    }
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param reader
+   *          the {@link JobTraceReader} that's being protected
+   * @param skewBufferSize
+   *          [the number of late jobs that can preced a later out-of-order
+   *          earlier job
+   * @throws IOException
+   */
+  public DeskewedJobTraceReader(JobTraceReader reader, int skewBufferLength,
+      boolean abortOnUnfixableSkew) throws IOException {
+    this.reader = reader;
+
+    this.skewBufferLength = skewBufferLength;
+
+    this.abortOnUnfixableSkew = abortOnUnfixableSkew;
+
+    skewBuffer =
+        new PriorityQueue<LoggedJob>(skewBufferLength + 1, new JobComparator());
+
+    fillSkewBuffer();
+  }
+
+  public DeskewedJobTraceReader(JobTraceReader reader) throws IOException {
+    this(reader, 0, true);
+  }
+
+  private LoggedJob rawNextJob() throws IOException {
+    LoggedJob result = reader.getNext();
+
+    if ((!abortOnUnfixableSkew || skewBufferLength > 0) && result != null) {
+      long thisTime = result.getSubmitTime();
+
+      if (submitTimesSoFar.contains(thisTime)) {
+        Integer myCount = countedRepeatedSubmitTimesSoFar.get(thisTime);
+
+        countedRepeatedSubmitTimesSoFar.put(thisTime, myCount == null ? 2
+            : myCount + 1);
+      } else {
+        submitTimesSoFar.add(thisTime);
+      }
+
+      if (thisTime < skewMeasurementLatestSubmitTime) {
+        Iterator<Long> endCursor = submitTimesSoFar.descendingIterator();
+
+        int thisJobNeedsSkew = 0;
+
+        Long keyNeedingSkew;
+
+        while (endCursor.hasNext()
+            && (keyNeedingSkew = endCursor.next()) > thisTime) {
+          Integer keyNeedsSkewAmount =
+              countedRepeatedSubmitTimesSoFar.get(keyNeedingSkew);
+
+          thisJobNeedsSkew +=
+              keyNeedsSkewAmount == null ? 1 : keyNeedsSkewAmount;
+        }
+
+        maxSkewBufferNeeded = Math.max(maxSkewBufferNeeded, thisJobNeedsSkew);
+      }
+
+      skewMeasurementLatestSubmitTime =
+          Math.max(thisTime, skewMeasurementLatestSubmitTime);
+    }
+
+    return result;
+  }
+
+  static class OutOfOrderException extends RuntimeException {
+    static final long serialVersionUID = 1L;
+
+    public OutOfOrderException(String text) {
+      super(text);
+    }
+  }
+
+  LoggedJob nextJob() throws IOException, OutOfOrderException {
+    LoggedJob newJob = rawNextJob();
+
+    if (newJob != null) {
+      skewBuffer.add(newJob);
+    }
+
+    LoggedJob result = skewBuffer.poll();
+
+    while (result != null && result.getSubmitTime() < returnedLatestSubmitTime)
{
+      LOG.error("The current job was submitted earlier than the previous one");
+      LOG.error("Its jobID is " + result.getJobID());
+      LOG.error("Its submit time is " + result.getSubmitTime()
+          + ",but the previous one was " + returnedLatestSubmitTime);
+
+      if (abortOnUnfixableSkew) {
+        throw new OutOfOrderException("Job submit time is "
+            + result.getSubmitTime() + ",but the previous one was "
+            + returnedLatestSubmitTime);
+      }
+
+      result = rawNextJob();
+    }
+
+    if (result != null) {
+      returnedLatestSubmitTime = result.getSubmitTime();
+    }
+
+    return result;
+  }
+
+  private void fillSkewBuffer() throws IOException {
+    for (int i = 0; i < skewBufferLength; ++i) {
+      LoggedJob newJob = rawNextJob();
+
+      if (newJob == null) {
+        return;
+      }
+
+      skewBuffer.add(newJob);
+    }
+  }
+
+  int neededSkewBufferSize() {
+    return maxSkewBufferNeeded;
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Folder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Folder.java?rev=897072&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Folder.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Folder.java Fri Jan  8
01:08:01 2010
@@ -0,0 +1,561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+public class Folder extends Configured implements Tool {
+  private long outputDuration = -1;
+  private long inputCycle = -1;
+  private double concentration = 1.0;
+  private long randomSeed = 0; // irrelevant if seeded == false
+  private boolean seeded = false;
+  private boolean debug = false;
+  private boolean allowMissorting = false;
+  private int skewBufferLength = 0;
+
+  static final private Log LOG = LogFactory.getLog(Folder.class);
+
+  private DeskewedJobTraceReader reader = null;
+  private JsonGenerator outGen = null;
+
+  private List<Path> tempPaths = new LinkedList<Path>();
+
+  private Path tempDir = null;
+
+  private long firstJobSubmitTime;
+
+  private double timeDilation;
+
+  private double transcriptionRateFraction;
+
+  private int transcriptionRateInteger;
+
+  private Random random;
+
+  static private final long TICKS_PER_SECOND = 1000L;
+
+  // error return codes
+  static private final int NON_EXISTENT_FILES = 1;
+  static private final int NO_INPUT_CYCLE_LENGTH = 2;
+  static private final int EMPTY_JOB_TRACE = 3;
+  static private final int OUT_OF_ORDER_JOBS = 4;
+  static private final int ALL_JOBS_SIMULTANEOUS = 5;
+  static private final int IO_ERROR = 6;
+  static private final int OTHER_ERROR = 7;
+
+  private Set<Closeable> closees = new HashSet<Closeable>();
+  private Set<Path> deletees = new HashSet<Path>();
+
+  static long parseDuration(String durationString) {
+    String numeral = durationString.substring(0, durationString.length() - 1);
+    char durationCode = durationString.charAt(durationString.length() - 1);
+
+    long result = Integer.parseInt(numeral);
+
+    if (result <= 0) {
+      throw new IllegalArgumentException("Negative durations are not allowed");
+    }
+
+    switch (durationCode) {
+    case 'D':
+    case 'd':
+      return 24L * 60L * 60L * TICKS_PER_SECOND * result;
+    case 'H':
+    case 'h':
+      return 60L * 60L * TICKS_PER_SECOND * result;
+    case 'M':
+    case 'm':
+      return 60L * TICKS_PER_SECOND * result;
+    case 'S':
+    case 's':
+      return TICKS_PER_SECOND * result;
+    default:
+      throw new IllegalArgumentException("Missing or invalid duration code");
+    }
+  }
+
+  private int initialize(String[] args) throws IllegalArgumentException {
+    String tempDirName = null;
+    String inputPathName = null;
+    String outputPathName = null;
+
+    for (int i = 0; i < args.length; ++i) {
+      String thisArg = args[i];
+
+      if (thisArg.equalsIgnoreCase("-output-duration")) {
+        outputDuration = parseDuration(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-input-cycle")) {
+        inputCycle = parseDuration(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-concentration")) {
+        concentration = Double.parseDouble(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-debug")) {
+        debug = true;
+      } else if (thisArg.equalsIgnoreCase("-allow-missorting")) {
+        allowMissorting = true;
+      } else if (thisArg.equalsIgnoreCase("-seed")) {
+        seeded = true;
+        randomSeed = Long.parseLong(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-skew-buffer-length")) {
+        skewBufferLength = Integer.parseInt(args[++i]);
+      } else if (thisArg.equalsIgnoreCase("-temp-directory")) {
+        tempDirName = args[++i];
+      } else if (thisArg.equals("") || thisArg.startsWith("-")) {
+        throw new IllegalArgumentException("Illegal switch argument, "
+            + thisArg + " at position " + i);
+      } else {
+        inputPathName = thisArg;
+        outputPathName = args[++i];
+
+        if (i != args.length - 1) {
+          throw new IllegalArgumentException("Too many non-switch arguments");
+        }
+      }
+    }
+
+    try {
+      Configuration conf = getConf();
+      Path inPath = new Path(inputPathName);
+      reader =
+          new DeskewedJobTraceReader(new JobTraceReader(inPath, conf),
+              skewBufferLength, !allowMissorting);
+      Path outPath = new Path(outputPathName);
+
+      ObjectMapper outMapper = new ObjectMapper();
+      outMapper.configure(
+          SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+      JsonFactory outFactory = outMapper.getJsonFactory();
+      FileSystem outFS = outPath.getFileSystem(conf);
+
+      CompressionCodec codec =
+          new CompressionCodecFactory(conf).getCodec(outPath);
+      OutputStream output;
+      Compressor compressor = null;
+      if (codec != null) {
+        compressor = CodecPool.getCompressor(codec);
+        output = codec.createOutputStream(outFS.create(outPath), compressor);
+      } else {
+        output = outFS.create(outPath);
+      }
+
+      outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
+      outGen.useDefaultPrettyPrinter();
+
+      tempDir =
+          tempDirName == null ? outPath.getParent() : new Path(tempDirName);
+
+      FileSystem fs = tempDir.getFileSystem(getConf());
+      if (!fs.getFileStatus(tempDir).isDir()) {
+        throw new IOException("Your temp directory is not a directory");
+      }
+
+      if (inputCycle <= 0) {
+        LOG.error("You must have an input cycle length.");
+        return NO_INPUT_CYCLE_LENGTH;
+      }
+
+      if (outputDuration <= 0) {
+        outputDuration = 60L * 60L * TICKS_PER_SECOND;
+      }
+
+      if (inputCycle <= 0) {
+        inputCycle = outputDuration;
+      }
+
+      timeDilation = (double) outputDuration / (double) inputCycle;
+
+      random = seeded ? new Random(randomSeed) : new Random();
+
+      if (debug) {
+        randomSeed = random.nextLong();
+
+        LOG.warn("This run effectively has a -seed of " + randomSeed);
+
+        random = new Random(randomSeed);
+
+        seeded = true;
+      }
+    } catch (IOException e) {
+      e.printStackTrace(System.err);
+
+      return NON_EXISTENT_FILES;
+    }
+
+    return 0;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException {
+    int result = initialize(args);
+
+    if (result != 0) {
+      return result;
+    }
+
+    return run();
+  }
+
+  public int run() throws IOException {
+    class JobEntryComparator implements
+        Comparator<Pair<LoggedJob, JobTraceReader>> {
+      public int compare(Pair<LoggedJob, JobTraceReader> p1,
+          Pair<LoggedJob, JobTraceReader> p2) {
+        LoggedJob j1 = p1.first();
+        LoggedJob j2 = p2.first();
+
+        return (j1.getSubmitTime() < j2.getSubmitTime()) ? -1 : (j1
+            .getSubmitTime() == j2.getSubmitTime()) ? 0 : 1;
+      }
+    }
+
+    ObjectMapper outMapper = new ObjectMapper();
+    outMapper.configure(
+        SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    JsonFactory outFactory = outMapper.getJsonFactory();
+
+    // we initialize an empty heap so if we take an error before establishing
+    // a real one the finally code goes through
+    Queue<Pair<LoggedJob, JobTraceReader>> heap =
+        new PriorityQueue<Pair<LoggedJob, JobTraceReader>>();
+
+    try {
+      LoggedJob job = reader.nextJob();
+
+      if (job == null) {
+        LOG.error("The job trace is empty");
+
+        return EMPTY_JOB_TRACE;
+      }
+
+      firstJobSubmitTime = job.getSubmitTime();
+      long lastJobSubmitTime = firstJobSubmitTime;
+
+      int numberJobs = 0;
+
+      long currentIntervalEnd = Long.MIN_VALUE;
+
+      Path nextSegment = null;
+      OutputStream tempUncompOut = null;
+      JsonGenerator tempGen = null;
+
+      if (debug) {
+        LOG.debug("The first job has a submit time of " + firstJobSubmitTime);
+      }
+
+      final Configuration conf = getConf();
+
+      try {
+        // At the top of this loop, skewBuffer has at most
+        // skewBufferLength entries.
+        while (job != null) {
+          final Random tempNameGenerator = new Random();
+
+          lastJobSubmitTime = job.getSubmitTime();
+
+          ++numberJobs;
+
+          if (job.getSubmitTime() >= currentIntervalEnd) {
+            if (tempGen != null) {
+              tempGen.close();
+            }
+            for (int i = 0; i < 3 && tempUncompOut == null; ++i) {
+              try {
+                nextSegment =
+                    new Path(tempDir, "segment-" + tempNameGenerator.nextLong()
+                        + ".json.gz");
+
+                if (debug) {
+                  LOG.debug("The next segment name is " + nextSegment);
+                }
+
+                FileSystem fs = nextSegment.getFileSystem(conf);
+
+                try {
+                  if (!fs.exists(nextSegment)) {
+                    tempUncompOut = fs.create(nextSegment, false);
+                  }
+
+                  continue;
+                } catch (IOException e) {
+                  // no code -- file did not already exist
+                }
+              } catch (IOException e) {
+                // no code -- file exists now, or directory bad. We try three
+                // times.
+              }
+            }
+
+            if (debug) {
+              LOG.debug("Creating " + nextSegment
+                  + " for a job with a submit time of " + job.getSubmitTime());
+            }
+
+            deletees.add(nextSegment);
+
+            tempPaths.add(nextSegment);
+
+            CompressionCodec codec =
+                new CompressionCodecFactory(conf).getCodec(nextSegment);
+            OutputStream output;
+            Compressor compressor = null;
+            if (codec != null) {
+              compressor = CodecPool.getCompressor(codec);
+              output = codec.createOutputStream(tempUncompOut, compressor);
+            } else {
+              output = tempUncompOut;
+            }
+
+            tempUncompOut = null;
+
+            tempGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8);
+            if (debug) {
+              tempGen.useDefaultPrettyPrinter();
+            }
+
+            long currentIntervalNumber =
+                (job.getSubmitTime() - firstJobSubmitTime) / inputCycle;
+
+            currentIntervalEnd =
+                firstJobSubmitTime + ((currentIntervalNumber + 1) * inputCycle);
+          }
+
+          // the temp files contain UDadjusted times, but each temp file's
+          // content is in the same input cycle interval.
+          tempGen.writeObject(job);
+
+          job = reader.nextJob();
+        }
+      } catch (DeskewedJobTraceReader.OutOfOrderException e) {
+        return OUT_OF_ORDER_JOBS;
+      } finally {
+        if (tempGen != null) {
+          tempGen.close();
+        }
+      }
+
+      if (lastJobSubmitTime <= firstJobSubmitTime) {
+        LOG.error("All of your job[s] have the same submit time."
+            + "  Please just use your input file.");
+
+        return ALL_JOBS_SIMULTANEOUS;
+      }
+
+      double submitTimeSpan = lastJobSubmitTime - firstJobSubmitTime;
+
+      LOG.warn("Your input trace spans "
+          + (lastJobSubmitTime - firstJobSubmitTime) + " ticks.");
+
+      double foldingRatio =
+          submitTimeSpan * (numberJobs + 1) / numberJobs / inputCycle;
+
+      if (debug) {
+        LOG.warn("run: submitTimeSpan = " + submitTimeSpan + ", numberJobs = "
+            + numberJobs + ", inputCycle = " + inputCycle);
+      }
+
+      if (reader.neededSkewBufferSize() > 0) {
+        LOG.warn("You needed a -skew-buffer-length of "
+            + reader.neededSkewBufferSize() + " but no more, for this input.");
+      }
+
+      double tProbability = timeDilation * concentration / foldingRatio;
+
+      if (debug) {
+        LOG.warn("run: timeDilation = " + timeDilation + ", concentration = "
+            + concentration + ", foldingRatio = " + foldingRatio);
+        LOG.warn("The transcription probability is " + tProbability);
+      }
+
+      transcriptionRateInteger = (int) Math.floor(tProbability);
+      transcriptionRateFraction = tProbability - Math.floor(tProbability);
+
+      // Now read all the inputs in parallel
+      heap =
+          new PriorityQueue<Pair<LoggedJob, JobTraceReader>>(tempPaths.size(),
+              new JobEntryComparator());
+
+      for (Path tempPath : tempPaths) {
+        JobTraceReader thisReader = new JobTraceReader(tempPath, conf);
+
+        closees.add(thisReader);
+
+        LoggedJob streamFirstJob = thisReader.getNext();
+
+        long thisIndex =
+            (streamFirstJob.getSubmitTime() - firstJobSubmitTime) / inputCycle;
+
+        if (debug) {
+          LOG.debug("A job with submit time of "
+              + streamFirstJob.getSubmitTime() + " is in interval # "
+              + thisIndex);
+        }
+
+        adjustJobTimes(streamFirstJob);
+
+        if (debug) {
+          LOG.debug("That job's submit time is adjusted to "
+              + streamFirstJob.getSubmitTime());
+        }
+
+        heap
+            .add(new Pair<LoggedJob, JobTraceReader>(streamFirstJob, thisReader));
+      }
+
+      Pair<LoggedJob, JobTraceReader> next = heap.poll();
+
+      while (next != null) {
+        maybeOutput(next.first());
+
+        if (debug) {
+          LOG.debug("The most recent job has an adjusted submit time of "
+              + next.first().getSubmitTime());
+          LOG.debug(" Its replacement in the heap will come from input engine "
+              + next.second());
+        }
+
+        LoggedJob replacement = next.second().getNext();
+
+        if (replacement == null) {
+          next.second().close();
+
+          if (debug) {
+            LOG.debug("That input engine is depleted.");
+          }
+        } else {
+          adjustJobTimes(replacement);
+
+          if (debug) {
+            LOG.debug("The replacement has an adjusted submit time of "
+                + replacement.getSubmitTime());
+          }
+
+          heap.add(new Pair<LoggedJob, JobTraceReader>(replacement, next
+              .second()));
+        }
+
+        next = heap.poll();
+      }
+    } finally {
+      IOUtils.cleanup(null, reader);
+      if (outGen != null) {
+        outGen.close();
+      }
+      for (Pair<LoggedJob, JobTraceReader> heapEntry : heap) {
+        heapEntry.second().close();
+      }
+      for (Closeable closee : closees) {
+        closee.close();
+      }
+      if (!debug) {
+        Configuration conf = getConf();
+
+        for (Path deletee : deletees) {
+          FileSystem fs = deletee.getFileSystem(conf);
+
+          try {
+            fs.delete(deletee, false);
+          } catch (IOException e) {
+            // no code
+          }
+        }
+      }
+    }
+
+    return 0;
+  }
+
+  private void maybeOutput(LoggedJob job) throws IOException {
+    for (int i = 0; i < transcriptionRateInteger; ++i) {
+      outGen.writeObject(job);
+    }
+
+    if (random.nextDouble() < transcriptionRateFraction) {
+      outGen.writeObject(job);
+    }
+  }
+
+  private void adjustJobTimes(LoggedJob adjustee) {
+    long offsetInCycle =
+        (adjustee.getSubmitTime() - firstJobSubmitTime) % inputCycle;
+
+    long outputOffset = (long) ((double) offsetInCycle * timeDilation);
+
+    long adjustment =
+        firstJobSubmitTime + outputOffset - adjustee.getSubmitTime();
+
+    adjustee.adjustTimes(adjustment);
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    Folder instance = new Folder();
+
+    int result = 0;
+
+    try {
+      result = ToolRunner.run(instance, args);
+    } catch (IOException e) {
+      e.printStackTrace(System.err);
+      System.exit(IO_ERROR);
+    } catch (Exception e) {
+      e.printStackTrace(System.err);
+      System.exit(OTHER_ERROR);
+    }
+
+    if (result != 0) {
+      System.exit(result);
+    }
+
+    return;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=897072&r1=897071&r2=897072&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
Fri Jan  8 01:08:01 2010
@@ -36,7 +36,9 @@
 
 /**
  * A simple wrapper for parsing JSON-encoded data using ObjectMapper.
- * @param <T> The (base) type of the object(s) to be parsed by this parser.
+ * 
+ * @param <T>
+ *          The (base) type of the object(s) to be parsed by this parser.
  */
 class JsonObjectMapperParser<T> implements Closeable {
   private final ObjectMapper mapper;
@@ -47,7 +49,7 @@
   /**
    * Constructor.
    * 
-   * @param path 
+   * @param path
    *          Path to the JSON data file, possibly compressed.
    * @param conf
    * @throws IOException

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=897072&r1=897071&r2=897072&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Jan
 8 01:08:01 2010
@@ -102,6 +102,24 @@
     setJobID(jobID);
   }
 
+  void adjustTimes(long adjustment) {
+    submitTime += adjustment;
+    launchTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTask task : mapTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : reduceTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : otherTasks) {
+      task.adjustTimes(adjustment);
+    }
+  }
+
   @SuppressWarnings("unused")
   // for input parameter ignored.
   @JsonAnySetter

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=897072&r1=897071&r2=897072&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Jan
 8 01:08:01 2010
@@ -69,6 +69,15 @@
     super();
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTaskAttempt attempt : attempts) {
+      attempt.adjustTimes(adjustment);
+    }
+  }
+
   public long getInputBytes() {
     return inputBytes;
   }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=897072&r1=897071&r2=897072&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
Fri Jan  8 01:08:01 2010
@@ -82,6 +82,11 @@
     }
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+  }
+
   public long getShuffleFinished() {
     return shuffleFinished;
   }



Mime
View raw message