hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1079189 - in /hadoop/mapreduce/branches/yahoo-merge/src: java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/jobhistory/ test/mapred/org/apache/hadoop/mapred/
Date Tue, 08 Mar 2011 05:53:36 GMT
Author: omalley
Date: Tue Mar  8 05:53:36 2011
New Revision: 1079189

URL: http://svn.apache.org/viewvc?rev=1079189&view=rev
Log:
commit 68c2c71c91fc1bd1526d6c29e367b54e3c9a241f
Author: Richard King <dking@yahoo-inc.com>
Date:   Wed Nov 24 18:18:28 2010 +0000

    These are the added files for my ill-fated attempt to add MR2037.

Added:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/CumulativePeriodicStats.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ *  measurements where the raw data are a measurement of an
+ *  accumulation.  The result in each bucket is the estimate 
+ *  of the progress-weighted change in that quantity over the
+ *  progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ *  a distance traveled.  It makes sense to consider that portion of
+ *  the total travel that can be apportioned to each bucket.
+ *
+ */
+class CumulativePeriodicStats extends PeriodicStatsAccumulator {
+  // int's are acceptable here, even though times are normally
+  // long's, because these are a difference and an int won't
+  // overflow for 24 days.  Tasks can't run for more than about a
+  // week for other reasons, and most jobs would be written 
+  int previousValue = 0;
+
+  CumulativePeriodicStats(int count) {
+    super(count);
+  }
+
+  /**
+   *
+   * accumulates a new reading by keeping a running account of the
+   *  value distance from the beginning of the bucket to the end of
+   *  this reading
+   */
+  @Override
+    protected void extendInternal(double newProgress, int newValue) {
+    if (state == null) {
+      return;
+    }
+
+    state.currentAccumulation += (double)(newValue - previousValue);
+    previousValue = newValue;
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/PeriodicStatsAccumulator.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ *
+ * This abstract class that represents a bucketed series of
+ *  measurements of a quantity being measured in a running task
+ *  attempt. 
+ *
+ * <p>The sole constructor is called with a count, which is the
+ *  number of buckets into which we evenly divide the spectrum of
+ *  progress from 0.0D to 1.0D .  In the future we may provide for
+ *  custom split points that don't have to be uniform.
+ *
+ * <p>A subclass determines how we fold readings for portions of a
+ *  bucket and how we interpret the readings by overriding
+ *  {@code extendInternal(...)} and {@code initializeInterval()}
+ */
+public abstract class PeriodicStatsAccumulator {
+  // The range of progress from 0.0D through 1.0D is divided into
+  //  count "progress segments".  This object accumulates an
+  //  estimate of the effective value of a time-varying value during
+  //  the zero-based i'th progress segment, ranging from i/count
+  //  through (i+1)/count . 
+  // This is an abstract class.  We have two implementations: one
+  //  for monotonically increasing time-dependent variables
+  //  [currently, CPU time in milliseconds and wallclock time in
+  //  milliseconds] and one for quantities that can vary arbitrarily
+  //  over time, currently virtual and physical memory used, in
+  //  kilobytes. 
+  // We carry int's here.  This saves a lot of JVM heap space in the
+  //  job tracker per running task attempt [200 bytes per] but it
+  //  has a small downside.
+  // No task attempt can run for more than 57 days nor occupy more
+  //  than two terabytes of virtual memory. 
+  protected final int count;
+  protected final int[] values;
+    
+  static class StatsetState {
+    int oldValue = 0;
+    double oldProgress = 0.0D;
+
+    double currentAccumulation = 0.0D;
+  }
+
+  // We provide this level of indirection to reduce the memory
+  //  footprint of done task attempts.  When a task's progress
+  //  reaches 1.0D, we delete this objecte StatsetState.
+  StatsetState state = new StatsetState();
+
+  PeriodicStatsAccumulator(int count) {
+    this.count = count;
+    this.values = new int[count];
+    for (int i = 0; i < count; ++i) {
+      values[i] = -1;
+    }
+  }
+
+  protected int[] getValues() {
+    return values;
+  }
+
+  // The concrete implementation of this abstract function
+  //  accumulates more data into the current progress segment.
+  //  newProgress [from the call] and oldProgress [from the object]
+  //  must be in [or at the border of] a single progress segment.
+  /**
+   *
+   * adds a new reading to the current bucket.
+   *
+   * @param newProgress the endpoint of the interval this new
+   *                      reading covers
+   * @param newValue the value of the reading at {@code newProgress} 
+   *
+   * The class has three instance variables, {@code oldProgress} and
+   *  {@code oldValue} and {@code currentAccumulation}. 
+   *
+   * {@code extendInternal} can count on three things: 
+   *
+   *   1: The first time it's called in a particular instance, both
+   *      oldXXX's will be zero.
+   *
+   *   2: oldXXX for a later call is the value of newXXX of the
+   *      previous call.  This ensures continuity in accumulation from
+   *      one call to the next.
+   *
+   *   3: {@code currentAccumulation} is owned by 
+   *      {@code initializeInterval} and {@code extendInternal}.
+   */
+  protected abstract void extendInternal(double newProgress, int newValue);
+
+  // What has to be done when you open a new interval
+  /**
+   * initializes the state variables to be ready for a new interval
+   */
+  protected void initializeInterval() {
+    state.currentAccumulation = 0.0D;
+  }
+
+  // called for each new reading
+  /**
+   * This method calls {@code extendInternal} at least once.  It
+   *  divides the current progress interval [from the last call's
+   *  {@code newProgress}  to this call's {@code newProgress} ]
+   *  into one or more subintervals by splitting at any point which
+   *  is an interval boundary if there are any such points.  It
+   *  then calls {@code extendInternal} for each subinterval, or the
+   *  whole interval if there are no splitting points.
+   * 
+   *  <p>For example, if the value was {@code 300} last time with
+   *  {@code 0.3}  progress, and count is {@code 5}, and you get a
+   *  new reading with the variable at {@code 700} and progress at
+   *  {@code 0.7}, you get three calls to {@code extendInternal}:
+   *  one extending from progress {@code 0.3} to {@code 0.4} [the
+   *  next boundary] with a value of {@code 400}, the next one
+   *  through {@code 0.6} with a  value of {@code 600}, and finally
+   *  one at {@code 700} with a progress of {@code 0.7} . 
+   *
+   * @param newProgress the endpoint of the progress range this new
+   *                      reading covers
+   * @param newValue the value of the reading at {@code newProgress} 
+   */    
+  protected void extend(double newProgress, int newValue) {
+    if (state == null || newProgress < state.oldProgress) {
+      return;
+    }
+
+    // This correctness of this code depends on 100% * count = count.
+    int oldIndex = (int)(state.oldProgress * count);
+    int newIndex = (int)(newProgress * count);
+    int originalOldValue = state.oldValue;
+
+    double fullValueDistance = (double)newValue - state.oldValue;
+    double fullProgressDistance = newProgress - state.oldProgress;
+    double originalOldProgress = state.oldProgress;
+
+    // In this loop we detect each subinterval boundary within the
+    //  range from the old progress to the new one.  Then we
+    //  interpolate the value from the old value to the new one to
+    //  infer what its value might have been at each such boundary.
+    //  Lastly we make the necessary calls to extendInternal to fold
+    //  in the data for each trapazoid where no such trapazoid
+    //  crosses a boundary.
+    for (int closee = oldIndex; closee < newIndex; ++closee) {
+      double interpolationProgress = (double)(closee + 1) / count;
+      // In floats, x * y / y might not equal y.
+      interpolationProgress = Math.min(interpolationProgress, newProgress);
+
+      double progressLength = (interpolationProgress - originalOldProgress);
+      double interpolationProportion = progressLength / fullProgressDistance;
+
+      double interpolationValueDistance
+        = fullValueDistance * interpolationProportion;
+
+      // estimates the value at the next [interpolated] subsegment boundary
+      int interpolationValue
+        = (int)interpolationValueDistance + originalOldValue;
+
+      extendInternal(interpolationProgress, interpolationValue);
+
+      advanceState(interpolationProgress, interpolationValue);
+
+      values[closee] = (int)state.currentAccumulation;
+      initializeInterval();
+
+    }
+
+    extendInternal(newProgress, newValue);
+    advanceState(newProgress, newValue);
+
+    if (newIndex == count) {
+      state = null;
+    }
+  }
+
+  protected void advanceState(double newProgress, int newValue) {
+    state.oldValue = newValue;
+    state.oldProgress = newProgress;
+  }    
+
+  int getCount() {
+    return count;
+  }
+
+  int get(int index) {
+    return values[index];
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ProgressSplitsBlock.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+
+/*
+ * This object gathers the [currently four] PeriodStatset's that we
+ * are gathering for a particular task attempt for packaging and
+ * handling as a single object.
+ */
+public class ProgressSplitsBlock {
+  final PeriodicStatsAccumulator progressWallclockTime;
+  final PeriodicStatsAccumulator progressCPUTime;
+  final PeriodicStatsAccumulator progressVirtualMemoryKbytes;
+  final PeriodicStatsAccumulator progressPhysicalMemoryKbytes;
+
+  static final int[] NULL_ARRAY = new int[0];
+
+  static final int WALLCLOCK_TIME_INDEX = 0;
+  static final int CPU_TIME_INDEX = 1;
+  static final int VIRTUAL_MEMORY_KBYTES_INDEX = 2;
+  static final int PHYSICAL_MEMORY_KBYTES_INDEX = 3;
+
+  static final int DEFAULT_NUMBER_PROGRESS_SPLITS = 12;
+
+  ProgressSplitsBlock(int numberSplits) {
+    progressWallclockTime
+      = new CumulativePeriodicStats(numberSplits);
+    progressCPUTime
+      = new CumulativePeriodicStats(numberSplits);
+    progressVirtualMemoryKbytes
+      = new StatePeriodicStats(numberSplits);
+    progressPhysicalMemoryKbytes
+      = new StatePeriodicStats(numberSplits);
+  }
+
+  // this coordinates with LoggedTaskAttempt.SplitVectorKind
+  int[][] burst() {
+    int[][] result = new int[4][];
+
+    result[WALLCLOCK_TIME_INDEX] = progressWallclockTime.getValues();
+    result[CPU_TIME_INDEX] = progressCPUTime.getValues();
+    result[VIRTUAL_MEMORY_KBYTES_INDEX] = progressVirtualMemoryKbytes.getValues();
+    result[PHYSICAL_MEMORY_KBYTES_INDEX] = progressPhysicalMemoryKbytes.getValues();
+
+    return result;
+  }
+
+  static public int[] arrayGet(int[][] burstedBlock, int index) {
+    return burstedBlock == null ? NULL_ARRAY : burstedBlock[index];
+  }
+
+  static public int[] arrayGetWallclockTime(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, WALLCLOCK_TIME_INDEX);
+  }
+
+  static public int[] arrayGetCPUTime(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, CPU_TIME_INDEX);
+  }
+
+  static public int[] arrayGetVMemKbytes(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, VIRTUAL_MEMORY_KBYTES_INDEX);
+  }
+
+  static public int[] arrayGetPhysMemKbytes(int[][] burstedBlock) {
+    return arrayGet(burstedBlock, PHYSICAL_MEMORY_KBYTES_INDEX);
+  }
+}
+    

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/StatePeriodicStats.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+
+/**
+ *
+ * This class is a concrete PeriodicStatsAccumulator that deals with
+ *  measurements where the raw data are a measurement of a
+ *  time-varying quantity.  The result in each bucket is the estimate
+ *  of the progress-weighted mean value of that quantity over the
+ *  progress range covered by the bucket.
+ *
+ * <p>An easy-to-understand example of this kind of quantity would be
+ *  a temperature.  It makes sense to consider the mean temperature
+ *  over a progress range.
+ *
+ */
+class StatePeriodicStats extends PeriodicStatsAccumulator {
+  StatePeriodicStats(int count) {
+    super(count);
+  }
+
+  /**
+   *
+   * accumulates a new reading by keeping a running account of the
+   *  area under the piecewise linear curve marked by pairs of
+   *  {@code newProgress, newValue} .
+   */
+  @Override
+    protected void extendInternal(double newProgress, int newValue) {
+    if (state == null) {
+      return;
+    }
+
+    // the effective height of this trapezoid if rectangularized
+    double mean = ((double)newValue + (double)state.oldValue)/2.0D;
+
+    // conceptually mean *  (newProgress - state.oldProgress) / (1 / count)
+    state.currentAccumulation += mean * (newProgress - state.oldProgress) * count;
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/AvroArrayUtils.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import java.lang.Integer;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+
+public class AvroArrayUtils {
+
+  private static final Schema ARRAY_INT
+      = Schema.createArray(Schema.create(Schema.Type.INT));
+
+  static public GenericArray<Integer> NULL_PROGRESS_SPLITS_ARRAY
+    = new GenericData.Array<Integer>(0, ARRAY_INT);
+
+  public static GenericArray<Integer>
+    toAvro(int values[]) {
+    GenericData.Array<Integer> result
+      = new GenericData.Array<Integer>(values.length, ARRAY_INT);
+
+    for (int i = 0; i < values.length; ++i) {
+      result.add(values[i]);
+    }
+
+    return result;
+  }
+
+  public static int[] fromAvro(GenericArray<Integer> avro) {
+    int[] result = new int[(int)avro.size()];
+
+    int i = 0;
+      
+    for (Iterator<Integer> iter = avro.iterator(); iter.hasNext(); ++i) {
+      result[i] = iter.next();
+    }
+
+    return result;
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplitTranscription.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
+
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
+
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory.JobHistoryRecordRetriever;
+
+import org.apache.hadoop.tools.rumen.TraceBuilder;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+import org.apache.hadoop.tools.rumen.ZombieJob;
+import org.apache.hadoop.tools.rumen.LoggedJob;
+import org.apache.hadoop.tools.rumen.LoggedTask;
+import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ZombieCluster;
+import org.apache.hadoop.tools.rumen.MachineNode;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplitTranscription {
+  // This testcase runs a job in a mini cluster, and then it verifies
+  //  that splits are stored in the resulting trace, and also
+  //  retrievable from the ZombieJob resulting from reading the trace.
+  // 
+  // We can't test for any particular values, unfortunately.
+  @Test
+  public void testTranscription() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+          lfs.getUri(), lfs.getWorkingDirectory());
+
+    final Path tempDir = new Path(rootTempDir, "testTranscription");
+    lfs.delete(tempDir, true);
+    
+    // Run a MR job
+    // create a MR cluster
+    conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+    conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
+    final MiniMRCluster mrCluster
+      = new MiniMRCluster(1, "file:///", 1, null, null, 
+                          new JobConf(conf));
+
+    final JobTracker tracker = mrCluster.getJobTrackerRunner().getJobTracker();
+    final JobHistory history = tracker.getJobHistory();
+    
+    // run a job
+    Path inDir = new Path(tempDir, "input");
+    Path outDir = new Path(tempDir, "output");
+
+    ZombieJobProducer story = null;
+
+    boolean success = false;
+    
+    try {
+      JobConf jConf = mrCluster.createJobConf();
+      // construct a job with 1 map and 1 reduce task.
+      Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1);
+      // disable setup/cleanup
+      job.setJobSetupCleanupNeeded(false);
+      // set the output format to take care of the _temporary folder
+      job.setOutputFormatClass(MyOutputFormat.class);
+      // wait for the job to complete
+      job.waitForCompletion(false);
+      
+      assertTrue("Job failed", job.isSuccessful());
+
+      JobID id = job.getJobID();
+
+      Path inputPath = null;
+      // wait for 10 secs for the jobhistory file to move into the done folder
+      for (int i = 0; i < 100; ++i) {
+        JobHistoryRecordRetriever retriever 
+          = history.getMatchingJobs(null, "", null, id.toString());
+        if (retriever.hasNext()) {
+          inputPath = retriever.next().getPath();
+          System.out.println("History file = " + inputPath);
+          break;
+        }
+        Thread.currentThread().sleep(100);
+      }
+    
+      assertTrue("Missing job history file", lfs.exists(inputPath));
+
+      System.out.println("testTranscription() input history file is "
+                         + inputPath.toString());
+
+      final Path topologyPath = new Path(tempDir, "dispatch-topology.json");
+      final Path tracePath = new Path(tempDir, "dispatch-trace.json");
+
+      System.err.println("testTranscription() output .json file is "
+                         + tracePath.toString());
+
+      String[] args =
+        { tracePath.toString(), topologyPath.toString(), inputPath.toString() };
+
+      Tool analyzer = new TraceBuilder();
+      int result = ToolRunner.run(analyzer, args);
+      assertEquals("Non-zero exit", 0, result);
+
+      MachineNode.Builder builder = new MachineNode.Builder("node.megacorp.com", 0);
+      MachineNode node = builder.build();
+      ZombieCluster cluster = new ZombieCluster(topologyPath, node, jConf);
+
+      story = new ZombieJobProducer(tracePath, cluster, jConf);
+
+      // test that the logged* has everything down to the split vector
+
+      ZombieJob theZombieJob = story.getNextJob();
+      LoggedJob theJob = theZombieJob.getLoggedJob();
+      LoggedTask firstMapTask = theJob.getMapTasks().get(0);
+      LoggedTaskAttempt firstAttempt = firstMapTask.getAttempts().get(0);
+
+      assertTrue("No clock splits were stored",
+                 firstAttempt.getClockSplits().size() > 0);
+
+      TaskAttemptInfo attemptInfo
+        = theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
+
+      assertEquals("Can't retrieve clock splits from the LoggedTaskAttempt",
+                   attemptInfo.getSplitVector
+                         (LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME)
+                      .size(),
+                    ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
+
+      // test that the ZombieJob can deliver splits vectors
+
+      TaskAttemptInfo tinfo = theZombieJob.getTaskAttemptInfo(TaskType.MAP, 0, 0);
+      List<Integer> splitVector
+        = tinfo.getSplitVector(LoggedTaskAttempt.SplitVectorKind.WALLCLOCK_TIME);
+
+      assertEquals("Can't retrieve clock splits from the ZombieJob",
+                   splitVector.size(), ProgressSplitsBlock.DEFAULT_NUMBER_PROGRESS_SPLITS);
+      
+      success = true;
+    } finally {
+      if (success) {
+        lfs.delete(tempDir, true);
+      }
+
+      if (story != null) {
+        story.close();
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java?rev=1079189&view=auto
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
(added)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskPerformanceSplits.java
Tue Mar  8 05:53:36 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestTaskPerformanceSplits {
+  @Test
+  public void testPeriodStatsets() {
+    PeriodicStatsAccumulator cumulative = new CumulativePeriodicStats(8);
+    PeriodicStatsAccumulator status = new StatePeriodicStats(8);
+
+    cumulative.extend(0.0D, 0);
+    cumulative.extend(0.4375D, 700); // 200 per octant
+    cumulative.extend(0.5625D, 1100); // 0.5 = 900
+    cumulative.extend(0.625D, 1300);
+    cumulative.extend(1.0D, 7901);
+
+    int total = 0;
+    int[] results = cumulative.getValues();
+
+    for (int i = 0; i < 8; ++i) {
+      System.err.println("segment i = " + results[i]);
+    }
+
+    assertEquals("Bad interpolation in cumulative segment 0", 200, results[0]);
+    assertEquals("Bad interpolation in cumulative segment 1", 200, results[1]);
+    assertEquals("Bad interpolation in cumulative segment 2", 200, results[2]);
+    assertEquals("Bad interpolation in cumulative segment 3", 300, results[3]);
+    assertEquals("Bad interpolation in cumulative segment 4", 400, results[4]);
+    assertEquals("Bad interpolation in cumulative segment 5", 2200, results[5]);
+    // these are rounded down
+    assertEquals("Bad interpolation in cumulative segment 6", 2200, results[6]);
+    assertEquals("Bad interpolation in cumulative segment 7", 2201, results[7]);
+
+    status.extend(0.0D, 0);
+    status.extend(1.0D/16.0D, 300); // + 75 for bucket 0
+    status.extend(3.0D/16.0D, 700); // + 200 for 0, +300 for 1
+    status.extend(7.0D/16.0D, 2300); // + 450 for 1, + 1500 for 2, + 1050 for 3
+    status.extend(1.0D, 1400);  // +1125 for 3, +2100 for 4, +1900 for 5,
+    ;                           // +1700 for 6, +1500 for 7
+
+    results = status.getValues();
+
+    assertEquals("Bad interpolation in status segment 0", 275, results[0]);
+    assertEquals("Bad interpolation in status segment 1", 750, results[1]);
+    assertEquals("Bad interpolation in status segment 2", 1500, results[2]);
+    assertEquals("Bad interpolation in status segment 3", 2175, results[3]);
+    assertEquals("Bad interpolation in status segment 4", 2100, results[4]);
+    assertEquals("Bad interpolation in status segment 5", 1900, results[5]);
+    assertEquals("Bad interpolation in status segment 6", 1700, results[6]);
+    assertEquals("Bad interpolation in status segment 7", 1500, results[7]);
+  }
+}



Mime
View raw message