hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r815628 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/ src/contrib/gridmix/ src/contrib/gridmix/ivy/ src/contrib/gridmix/src/ src/contrib/gridmix/src/java/ src/contrib/gridmix/src/java/org/ src/contrib/gridmix/src/java/org/apache/ src/...
Date Wed, 16 Sep 2009 06:35:43 GMT
Author: cdouglas
Date: Wed Sep 16 06:35:42 2009
New Revision: 815628

URL: http://svn.apache.org/viewvc?rev=815628&view=rev
Log:
MAPREDUCE-776. Add Gridmix, a benchmark processing Rumen traces to simulate
a measured mix of jobs on a cluster.

Added:
    hadoop/mapreduce/trunk/src/contrib/gridmix/
    hadoop/mapreduce/trunk/src/contrib/gridmix/build.xml
    hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/
    hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/libraries.properties
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java
    hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/build.xml
    hadoop/mapreduce/trunk/src/contrib/build-contrib.xml
    hadoop/mapreduce/trunk/src/contrib/build.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=815628&r1=815627&r2=815628&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Sep 16 06:35:42 2009
@@ -99,6 +99,9 @@
     MAPREDUCE-861. Add support for hierarchical queues in the Map/Reduce
     framework. (Rahul Kumar Singh via yhemanth)
 
+    MAPREDUCE-776. Add Gridmix, a benchmark processing Rumen traces to simulate
+    a measured mix of jobs on a cluster. (cdouglas)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=815628&r1=815627&r2=815628&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/build.xml (original)
+++ hadoop/mapreduce/trunk/build.xml Wed Sep 16 06:35:42 2009
@@ -620,7 +620,7 @@
     <fail unless="continueOnFailure">Tests failed!</fail>
   </target>
 
-  <target name="test-contrib" depends="compile,compile-mapred-test" description="Run contrib unit tests">
+  <target name="test-contrib" depends="compile,compile-tools,compile-mapred-test" description="Run contrib unit tests">
     <subant target="test">
        <property name="version" value="${version}"/>
        <property name="clover.jar" value="${clover.jar}"/>
@@ -685,6 +685,7 @@
       <property name="version" value="${version}"/>
       <property name="dist.dir" value="${dist.dir}"/>
       <fileset file="${contrib.dir}/streaming/build.xml"/>
+      <fileset file="${contrib.dir}/gridmix/build.xml"/>
     </subant>
  </target>
 
@@ -714,10 +715,12 @@
       <sourcePath path="${examples.dir}" />
       <sourcePath path="${tools.src}" />
       <sourcePath path="${basedir}/src/contrib/streaming/src/java" />
+      <sourcePath path="${basedir}/src/contrib/gridmix/src/java" />
       <class location="${basedir}/build/${final.name}.jar" />
       <class location="${basedir}/build/${examples.final.name}.jar" />
       <class location="${basedir}/build/${tools.final.name}.jar" />
       <class location="${basedir}/build/contrib/streaming/hadoop-${version}-streaming.jar" />
+      <class location="${basedir}/build/contrib/gridmix/hadoop-${version}-gridmix.jar" />
     </findbugs>
 
         <xslt style="${findbugs.home}/src/xsl/default.xsl"
@@ -793,6 +796,7 @@
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
     	<packageset dir="src/contrib/index/src/java"/>
+    	<packageset dir="src/contrib/gridmix/src/java"/>
 
         <link href="${javadoc.link.java}"/>
 
@@ -846,6 +850,7 @@
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
     	<packageset dir="src/contrib/index/src/java"/>
+    	<packageset dir="src/contrib/gridmix/src/java"/>
 	
         <link href="${javadoc.link.java}"/>
 

Modified: hadoop/mapreduce/trunk/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml?rev=815628&r1=815627&r2=815628&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/build-contrib.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/build-contrib.xml Wed Sep 16 06:35:42 2009
@@ -56,6 +56,7 @@
             value="http://java.sun.com/j2se/1.4/docs/api/"/>
 
   <property name="build.encoding" value="ISO-8859-1"/>
+  <property name="dest.jar" value="${build.dir}/hadoop-${version}-${name}.jar"/>
 
   <fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
 
@@ -83,6 +84,7 @@
     <pathelement location="${build.classes}"/>
     <fileset refid="lib.jars"/>
     <pathelement location="${hadoop.root}/build/classes"/>
+    <pathelement location="${hadoop.root}/build/tools"/>
     <fileset dir="${hadoop.root}/lib">
       <include name="**/*.jar" />
     </fileset>
@@ -179,7 +181,7 @@
   <target name="jar" depends="compile" unless="skip.contrib">
     <echo message="contrib: ${name}"/>
     <jar
-      jarfile="${build.dir}/hadoop-${version}-${name}.jar"
+      jarfile="${dest.jar}"
       basedir="${build.classes}"      
     />
   </target>
@@ -206,7 +208,7 @@
     <mkdir dir="${dist.dir}/contrib/${name}"/>
     <copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false" flatten="true">
       <fileset dir="${build.dir}">
-        <include name="hadoop-${version}-${name}.jar" />
+        <include name="${dest.jar}" />
       </fileset>
     </copy>
   </target>
@@ -220,7 +222,7 @@
     <mkdir dir="${hadoop.log.dir}"/>
     <junit
       printsummary="yes" showoutput="${test.output}" 
-      haltonfailure="no" fork="yes" maxmemory="256m"
+      haltonfailure="no" fork="yes" maxmemory="512m"
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}">
       

Modified: hadoop/mapreduce/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/build.xml?rev=815628&r1=815627&r2=815628&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/build.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/build.xml Wed Sep 16 06:35:42 2009
@@ -56,6 +56,7 @@
       <fileset dir="." includes="sqoop/build.xml"/>
       <fileset dir="." includes="mrunit/build.xml"/> 
       <fileset dir="." includes="dynamic-scheduler/build.xml"/>
+      <fileset dir="." includes="gridmix/build.xml"/>
     </subant>
     <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
     <fail if="testsfailed">Tests failed!</fail>

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/build.xml?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/build.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/build.xml Wed Sep 16 06:35:42 2009
@@ -0,0 +1,23 @@
+<?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="gridmix" default="jar">
+
+  <import file="../build-contrib.xml"/>
+
+</project>

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml Wed Sep 16 06:35:42 2009
@@ -0,0 +1,93 @@
+<?xml version="1.0" ?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<ivy-module version="1.0">
+  <info organisation="org.apache.hadoop" module="${ant.project.name}">
+    <license name="Apache 2.0"/>
+    <description>Rumen</description>
+  </info>
+  <configurations defaultconfmapping="default">
+    <!--these match the Maven configurations-->
+    <conf name="default" extends="master,runtime"/>
+    <conf name="master" description="contains the artifact but no dependencies"/>
+    <conf name="runtime" description="runtime but not the artifact" />
+
+    <conf name="common" visibility="private" extends="runtime"
+      description="artifacts needed to compile/test the application"/>
+  </configurations>
+
+  <publications>
+    <!--get the artifact from our module name-->
+    <artifact conf="master"/>
+  </publications>
+  <dependencies>
+    <dependency org="commons-logging"
+      name="commons-logging"
+      rev="${commons-logging.version}"
+      conf="common->default"/>
+    <dependency org="log4j"
+      name="log4j"
+      rev="${log4j.version}"
+      conf="common->master"/>
+    <dependency org="junit"
+      name="junit"
+      rev="${junit.version}"
+      conf="common->default"/>
+
+    <!-- necessary for Mini*Clusters -->
+    <dependency org="commons-httpclient"
+      name="commons-httpclient"
+      rev="${commons-httpclient.version}"
+      conf="common->master"/>
+    <dependency org="commons-codec"
+      name="commons-codec"
+      rev="${commons-codec.version}"
+      conf="common->default"/>
+    <dependency org="commons-net"
+      name="commons-net"
+      rev="${commons-net.version}"
+      conf="common->default"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jetty-util"
+      rev="${jetty-util.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-api-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="jsp-2.1"
+      rev="${jetty.version}"
+      conf="common->master"/>
+    <dependency org="org.mortbay.jetty"
+      name="servlet-api-2.5"
+      rev="${servlet-api-2.5.version}"
+      conf="common->master"/>
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="common->default"/>
+    <dependency org="org.apache.hadoop"
+      name="avro"
+      rev="1.0.0"
+      conf="common->default"/>
+  </dependencies>
+</ivy-module>

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/libraries.properties?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/libraries.properties (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/ivy/libraries.properties Wed Sep 16 06:35:42 2009
@@ -0,0 +1,20 @@
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+#This properties file lists the versions of the various artifacts used by streaming.
+#It drives ivy and the generation of a maven POM
+
+#Please list the dependencies name with version if they are different from the ones
+#listed in the global libraries.properties file (in alphabetical order)

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Wed Sep 16 06:35:42 2009
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class for caching a pool of input data to be used by synthetic jobs for
+ * simulating read traffic.
+ */
+class FilePool {
+
+  public static final Log LOG = LogFactory.getLog(FilePool.class);
+
+  /**
+   * The minimum file size added to the pool. Default 128MiB.
+   */
+  public static final String GRIDMIX_MIN_FILE = "gridmix.min.file.size";
+
+  /**
+   * The maximum size for files added to the pool. Defualts to 100TiB.
+   */
+  public static final String GRIDMIX_MAX_TOTAL = "gridmix.max.total.scan";
+
+  private Node root;
+  private final Path path;
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final ReadWriteLock updateLock;
+
+  /**
+   * Initialize a filepool under the path provided, but do not populate the
+   * cache.
+   */
+  public FilePool(Configuration conf, Path input) throws IOException {
+    root = null;
+    this.conf = conf;
+    this.path = input;
+    this.fs = path.getFileSystem(conf);
+    updateLock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * Gather a collection of files at least as large as minSize.
+   * @return The total size of files returned.
+   */
+  public long getInputFiles(long minSize, Collection<FileStatus> files)
+      throws IOException {
+    updateLock.readLock().lock();
+    try {
+      return root.selectFiles(minSize, files);
+    } finally {
+      updateLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * (Re)generate cache of input FileStatus objects.
+   */
+  public void refresh() throws IOException {
+    updateLock.writeLock().lock();
+    try {
+      root = new InnerDesc(fs, fs.getFileStatus(path),
+        new MinFileFilter(conf.getLong(GRIDMIX_MIN_FILE, 128 * 1024 * 1024),
+                          conf.getLong(GRIDMIX_MAX_TOTAL, 100L * (1L << 40))));
+      if (0 == root.getSize()) {
+        throw new IOException("Found no satisfactory file in " + path);
+      }
+    } finally {
+      updateLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Get a set of locations for the given file.
+   */
+  public BlockLocation[] locationsFor(FileStatus stat, long start, long len)
+      throws IOException {
+    // TODO cache
+    return fs.getFileBlockLocations(stat, start, len);
+  }
+
+  static abstract class Node {
+
+    protected final static Random rand = new Random();
+
+    /**
+     * Total size of files and directories under the current node.
+     */
+    abstract long getSize();
+
+    /**
+     * Return a set of files whose cumulative size is at least
+     * <tt>targetSize</tt>.
+     * TODO Clearly size is not the only criterion, e.g. refresh from
+     * generated data without including running task output, tolerance
+     * for permission issues, etc.
+     */
+    abstract long selectFiles(long targetSize, Collection<FileStatus> files)
+        throws IOException;
+  }
+
+  interface IndexMapper {
+    int get(int pos);
+    void swap(int a, int b);
+  }
+
+  /**
+   * A sparse index mapping table - useful when we want to
+   * non-destructively permute a small fraction of a large array.
+   */
+  static class SparseIndexMapper implements IndexMapper {
+    Map<Integer, Integer> mapping = new HashMap<Integer, Integer>();
+
+    public int get(int pos) {
+      Integer mapped = mapping.get(pos);
+      if (mapped == null) return pos;
+      return mapped;
+    }
+
+    public void swap(int a, int b) {
+      int valA = get(a);
+      int valB = get(b);
+      if (b == valA) {
+        mapping.remove(b);
+      } else {
+        mapping.put(b, valA);
+      }
+      if (a == valB) {
+        mapping.remove(a);
+      } else {
+        mapping.put(a, valB);
+      }
+    }
+  }
+
+  /**
+   * A dense index mapping table - useful when we want to
+   * non-destructively permute a large fraction of an array.
+   */
+  static class DenseIndexMapper implements IndexMapper {
+    int[] mapping;
+
+    DenseIndexMapper(int size) {
+      mapping = new int[size];
+      for (int i=0; i<size; ++i) {
+        mapping[i] = i;
+      }
+    }
+
+    public int get(int pos) {
+      if ( (pos < 0) || (pos>=mapping.length) ) {
+        throw new IndexOutOfBoundsException();
+      }
+      return mapping[pos];
+    }
+
+    public void swap(int a, int b) {
+      int valA = get(a);
+      int valB = get(b);
+      mapping[a]=valB;
+      mapping[b]=valA;
+    }
+  }
+
+  /**
+   * Files in current directory of this Node.
+   */
+  static class LeafDesc extends Node {
+    final long size;
+    final ArrayList<FileStatus> curdir;
+
+    LeafDesc(ArrayList<FileStatus> curdir, long size) {
+      this.size = size;
+      this.curdir = curdir;
+    }
+
+    @Override
+    public long getSize() {
+      return size;
+    }
+
+    @Override
+    public long selectFiles(long targetSize, Collection<FileStatus> files)
+        throws IOException {
+      if (targetSize >= getSize()) {
+        files.addAll(curdir);
+        return getSize();
+      }
+
+      // TODO sort, pick rand pairs of kth large/small in dir
+      IndexMapper mapping;
+      if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) {
+        mapping = new DenseIndexMapper(curdir.size());
+      } else {
+        mapping = new SparseIndexMapper();
+      }
+
+      ArrayList<Integer> selected = new ArrayList<Integer>();
+      long ret = 0L;
+      int poolSize = curdir.size();
+      while (ret < targetSize) {
+        int pos = rand.nextInt(poolSize);
+        int index = mapping.get(pos);
+        selected.add(index);
+        ret += curdir.get(index).getLen();
+        mapping.swap(pos, --poolSize);
+      }
+
+      for (Integer i : selected) {
+        files.add(curdir.get(i));
+      }
+
+      return ret;
+    }
+  }
+
+  /**
+   * A subdirectory of the current Node.
+   */
+  static class InnerDesc extends Node {
+    final long size;
+    final double[] dist;
+    final Node[] subdir;
+
+    private static final Comparator<Node> nodeComparator =
+      new Comparator<Node>() {
+          public int compare(Node n1, Node n2) {
+            return n1.getSize() < n2.getSize() ? -1
+                 : n1.getSize() > n2.getSize() ? 1 : 0;
+          }
+    };
+
+    InnerDesc(final FileSystem fs, FileStatus thisDir, MinFileFilter filter)
+        throws IOException {
+      long fileSum = 0L;
+      final ArrayList<FileStatus> curFiles = new ArrayList<FileStatus>();
+      final ArrayList<FileStatus> curDirs = new ArrayList<FileStatus>();
+      for (FileStatus stat : fs.listStatus(thisDir.getPath())) {
+        if (stat.isDir()) {
+          curDirs.add(stat);
+        } else if (filter.accept(stat)) {
+          curFiles.add(stat);
+          fileSum += stat.getLen();
+        }
+      }
+      ArrayList<Node> subdirList = new ArrayList<Node>();
+      if (!curFiles.isEmpty()) {
+        subdirList.add(new LeafDesc(curFiles, fileSum));
+      }
+      for (Iterator<FileStatus> i = curDirs.iterator();
+          !filter.done() && i.hasNext();) {
+        // add subdirectories
+        final Node d = new InnerDesc(fs, i.next(), filter);
+        final long dSize = d.getSize();
+        if (dSize > 0) {
+          fileSum += dSize;
+          subdirList.add(d);
+        }
+      }
+      size = fileSum;
+      LOG.debug(size + " bytes in " + thisDir.getPath());
+      subdir = subdirList.toArray(new Node[subdirList.size()]);
+      Arrays.sort(subdir, nodeComparator);
+      dist = new double[subdir.length];
+      for (int i = dist.length - 1; i > 0; --i) {
+        fileSum -= subdir[i].getSize();
+        dist[i] = fileSum / (1.0 * size);
+      }
+    }
+
+    @Override
+    public long getSize() {
+      return size;
+    }
+
+    @Override
+    public long selectFiles(long targetSize, Collection<FileStatus> files)
+        throws IOException {
+      long ret = 0L;
+      if (targetSize >= getSize()) {
+        // request larger than all subdirs; add everything
+        for (Node n : subdir) {
+          long added = n.selectFiles(targetSize, files);
+          ret += added;
+          targetSize -= added;
+        }
+        return ret;
+      }
+
+      // can satisfy request in proper subset of contents
+      // select random set, weighted by size
+      final HashSet<Node> sub = new HashSet<Node>();
+      do {
+        assert sub.size() < subdir.length;
+        final double r = rand.nextDouble();
+        int pos = Math.abs(Arrays.binarySearch(dist, r) + 1) - 1;
+        while (sub.contains(subdir[pos])) {
+          pos = (pos + 1) % subdir.length;
+        }
+        long added = subdir[pos].selectFiles(targetSize, files);
+        ret += added;
+        targetSize -= added;
+        sub.add(subdir[pos]);
+      } while (targetSize > 0);
+      return ret;
+    }
+  }
+
+  /**
+   * Filter enforcing the minFile/maxTotal parameters of the scan.
+   */
+  private static class MinFileFilter {
+
+    private long totalScan;
+    private final long minFileSize;
+
+    public MinFileFilter(long minFileSize, long totalScan) {
+      this.minFileSize = minFileSize;
+      this.totalScan = totalScan;
+    }
+    public boolean done() {
+      return totalScan <= 0;
+    }
+    public boolean accept(FileStatus stat) {
+      final boolean done = done();
+      if (!done && stat.getLen() >= minFileSize) {
+        totalScan -= stat.getLen();
+        return true;
+      }
+      return false;
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java Wed Sep 16 06:35:42 2009
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+// TODO can replace with form of GridmixJob
+class GenerateData extends GridmixJob {
+
+  public GenerateData(Configuration conf, Path outdir, long genbytes)
+      throws IOException {
+    super(conf, 0L, "GRIDMIX_GENDATA");
+    job.getConfiguration().setLong("gridmix.gendata.bytes", genbytes);
+    FileOutputFormat.setOutputPath(job, outdir);
+  }
+
+  @Override
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    job.setMapperClass(GenDataMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(BytesWritable.class);
+    job.setInputFormatClass(GenDataFormat.class);
+    job.setOutputFormatClass(RawBytesOutputFormat.class);
+    job.setJarByClass(GenerateData.class);
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    job.submit();
+    return job;
+  }
+
+  public static class GenDataMapper
+      extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {
+
+    private BytesWritable val;
+    private final Random r = new Random();
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      val = new BytesWritable(new byte[
+          context.getConfiguration().getInt("gendata.val.bytes", 1024 * 1024)]);
+    }
+
+    @Override
+    public void map(NullWritable key, LongWritable value, Context context)
+        throws IOException, InterruptedException {
+      for (long bytes = value.get(); bytes > 0; bytes -= val.getLength()) {
+        r.nextBytes(val.getBytes());
+        val.setSize((int)Math.min(val.getLength(), bytes));
+        context.write(key, val);
+      }
+    }
+
+  }
+
+  static class GenDataFormat extends InputFormat<NullWritable,LongWritable> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      final JobClient client = new JobClient(jobCtxt.getConfiguration());
+      ClusterStatus stat = client.getClusterStatus(true);
+      final long toGen =
+        jobCtxt.getConfiguration().getLong("gridmix.gendata.bytes", -1);
+      if (toGen < 0) {
+        throw new IOException("Invalid/missing generation bytes: " + toGen);
+      }
+      final int nTrackers = stat.getTaskTrackers();
+      final long bytesPerTracker = toGen / nTrackers;
+      final ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nTrackers);
+      final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*");
+      final Matcher m = trackerPattern.matcher("");
+      for (String tracker : stat.getActiveTrackerNames()) {
+        m.reset(tracker);
+        if (!m.find()) {
+          System.err.println("Skipping node: " + tracker);
+          continue;
+        }
+        final String name = m.group(1);
+        splits.add(new GenSplit(bytesPerTracker, new String[] { name }));
+      }
+      return splits;
+    }
+
+    @Override
+    public RecordReader<NullWritable,LongWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException {
+      return new RecordReader<NullWritable,LongWritable>() {
+        long written = 0L;
+        long write = 0L;
+        long RINTERVAL;
+        long toWrite;
+        final NullWritable key = NullWritable.get();
+        final LongWritable val = new LongWritable();
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext ctxt)
+            throws IOException, InterruptedException {
+          toWrite = split.getLength();
+          RINTERVAL = ctxt.getConfiguration().getInt(
+              "gendata.report.interval.mb", 10) << 20;
+        }
+        @Override
+        public boolean nextKeyValue() throws IOException {
+          written += write;
+          write = Math.min(toWrite - written, RINTERVAL);
+          val.set(write);
+          return written < toWrite;
+        }
+        @Override
+        public float getProgress() throws IOException {
+          return written / ((float)toWrite);
+        }
+        @Override
+        public NullWritable getCurrentKey() { return key; }
+        @Override
+        public LongWritable getCurrentValue() { return val; }
+        @Override
+        public void close() throws IOException {
+          taskContext.setStatus("Wrote " + toWrite);
+        }
+      };
+    }
+  }
+
+  static class GenSplit extends InputSplit implements Writable {
+    private long bytes;
+    private int nLoc;
+    private String[] locations;
+
+    public GenSplit() { }
+    public GenSplit(long bytes, String[] locations) {
+      this(bytes, locations.length, locations);
+    }
+    public GenSplit(long bytes, int nLoc, String[] locations) {
+      this.bytes = bytes;
+      this.nLoc = nLoc;
+      this.locations = Arrays.copyOf(locations, nLoc);
+    }
+    @Override
+    public long getLength() {
+      return bytes;
+    }
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      bytes = in.readLong();
+      nLoc = in.readInt();
+      if (null == locations || locations.length < nLoc) {
+        locations = new String[nLoc];
+      }
+      for (int i = 0; i < nLoc; ++i) {
+        locations[i] = Text.readString(in);
+      }
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeLong(bytes);
+      out.writeInt(nLoc);
+      for (int i = 0; i < nLoc; ++i) {
+        Text.writeString(out, locations[i]);
+      }
+    }
+  }
+
+  static class RawBytesOutputFormat
+      extends FileOutputFormat<NullWritable,BytesWritable> {
+
+    @Override
+    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+        TaskAttemptContext job) throws IOException {
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(job.getConfiguration());
+      final FSDataOutputStream fileOut = fs.create(file, false);
+      return new RecordWriter<NullWritable,BytesWritable>() {
+        @Override
+        public void write(NullWritable key, BytesWritable value)
+            throws IOException {
+          fileOut.write(value.getBytes(), 0, value.getLength());
+        }
+        @Override
+        public void close(TaskAttemptContext ctxt) throws IOException {
+          fileOut.close();
+        }
+      };
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Wed Sep 16 06:35:42 2009
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Driver class for the Gridmix3 benchmark. Gridmix accepts a timestamped
+ * stream (trace) of job/task descriptions. For each job in the trace, the
+ * client will submit a corresponding, synthetic job to the target cluster at
+ * the rate in the original trace. The intent is to provide a benchmark that
+ * can be configured and extended to closely match the measured resource
+ * profile of actual, production loads.
+ */
+public class Gridmix extends Configured implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(Gridmix.class);
+
+  /**
+   * Output (scratch) directory for submitted jobs. Relative paths are
+   * resolved against the path provided as input and absolute paths remain
+   * independent of it. The default is &quot;gridmix&quot;.
+   */
+  public static final String GRIDMIX_OUT_DIR = "gridmix.output.directory";
+
+  /**
+   * Number of submitting threads at the client and upper bound for
+   * in-memory split data. Submitting threads precompute InputSplits for
+   * submitted jobs. This limits the number of splits held in memory waiting
+   * for submission and also permits parallel computation of split data.
+   */
+  public static final String GRIDMIX_SUB_THR = "gridmix.client.submit.threads";
+
+  /**
+   * The depth of the queue of job descriptions. Before splits are computed,
+   * a queue of pending descriptions is stored in memoory. This parameter
+   * limits the depth of that queue.
+   */
+  public static final String GRIDMIX_QUE_DEP =
+    "gridmix.client.pending.queue.depth";
+
+  /**
+   * Size of key data in synthetic jobs. At present, key length is not
+   * available in job traces. Since all solutions are equally bad, globally
+   * specifying the amount of each record that is key data is the simplest
+   * to implement and the method chosen.
+   */
+  public static final String GRIDMIX_KEY_LEN = "gridmix.min.key.length";
+
+  // Submit data structures
+  private JobFactory factory;
+  private JobSubmitter submitter;
+  private JobMonitor monitor;
+
+  // Shutdown hook
+  private final Shutdown sdh = new Shutdown();
+
+  /**
+   * Write random bytes at the path provided.
+   * @see org.apache.hadoop.mapred.gridmix.GenerateData
+   */
+  protected void writeInputData(long genbytes, Path ioPath)
+      throws IOException, InterruptedException {
+    final Configuration conf = getConf();
+    final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
+    submitter.add(genData);
+    LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
+        " of test data...");
+    // TODO add listeners, use for job dependencies
+    TimeUnit.SECONDS.sleep(10);
+    try {
+      genData.getJob().waitForCompletion(false);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Internal error", e);
+    }
+    if (!genData.getJob().isSuccessful()) {
+      throw new IOException("Data generation failed!");
+    }
+    LOG.info("Done.");
+  }
+
+  protected InputStream createInputStream(String in) throws IOException {
+    if ("-".equals(in)) {
+      return System.in;
+    }
+    final Path pin = new Path(in);
+    return pin.getFileSystem(getConf()).open(pin);
+  }
+
+  /**
+   * Create each component in the pipeline and start it.
+   * @param conf Configuration data, no keys specific to this context
+   * @param traceIn Either a Path to the trace data or &quot;-&quot; for
+   *                stdin
+   * @param ioPath Path from which input data is read
+   * @param scratchDir Path into which job output is written
+   * @param startFlag Semaphore for starting job trace pipeline
+   */
+  private void startThreads(Configuration conf, String traceIn, Path ioPath,
+      Path scratchDir, CountDownLatch startFlag) throws IOException {
+    monitor = createJobMonitor();
+    submitter = createJobSubmitter(monitor,
+        conf.getInt(GRIDMIX_SUB_THR,
+          Runtime.getRuntime().availableProcessors() + 1),
+        conf.getInt(GRIDMIX_QUE_DEP, 100),
+        new FilePool(conf, ioPath));
+    factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
+    monitor.start();
+    submitter.start();
+    factory.start();
+  }
+
+  protected JobMonitor createJobMonitor() throws IOException {
+    return new JobMonitor();
+  }
+
+  protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
+      int queueDepth, FilePool pool) throws IOException {
+    return new JobSubmitter(monitor, threads, queueDepth, pool);
+  }
+
+  protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
+      Path scratchDir, Configuration conf, CountDownLatch startFlag)
+      throws IOException {
+    return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
+        conf, startFlag);
+  }
+
+  public int run(String[] argv) throws IOException, InterruptedException {
+    if (argv.length < 2) {
+      printUsage(System.err);
+      return 1;
+    }
+    long genbytes = 0;
+    String traceIn = null;
+    Path ioPath = null;
+    try {
+      int i = 0;
+      genbytes = "-generate".equals(argv[i++])
+        ? StringUtils.TraditionalBinaryPrefix.string2long(argv[i++])
+        : --i;
+      ioPath = new Path(argv[i++]);
+      traceIn = argv[i++];
+      if (i != argv.length) {
+        printUsage(System.err);
+        return 1;
+      }
+    } catch (Exception e) {
+      printUsage(System.err);
+      return 1;
+    }
+    FileSystem fs = null;
+    InputStream trace = null;
+    try {
+      final Configuration conf = getConf();
+      Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
+      fs = scratchDir.getFileSystem(conf);
+      // add shutdown hook for SIGINT, etc.
+      Runtime.getRuntime().addShutdownHook(sdh);
+      CountDownLatch startFlag = new CountDownLatch(1);
+      try {
+        // Create, start job submission threads
+        startThreads(conf, traceIn, ioPath, scratchDir, startFlag);
+        // Write input data if specified
+        if (genbytes > 0) {
+          writeInputData(genbytes, ioPath);
+        }
+        // scan input dir contents
+        submitter.refreshFilePool();
+      } catch (IOException e) {
+        LOG.error("Startup failed", e);
+        if (factory != null) factory.abort(); // abort pipeline
+      } catch (InterruptedException e) {
+        LOG.error("Startup failed", e);
+        if (factory != null) factory.abort(); // abort pipeline
+      } finally {
+        // signal for factory to start; sets start time
+        startFlag.countDown();
+      }
+
+      if (factory != null) {
+        // wait for input exhaustion
+        factory.join();
+        if (null != factory.error()) {
+          throw factory.error();
+        }
+        // wait for pending tasks to be submitted
+        submitter.shutdown();
+        submitter.join();
+        // wait for running tasks to complete
+        monitor.shutdown();
+        monitor.join();
+      }
+    } finally {
+      IOUtils.cleanup(LOG, trace);
+    }
+    return 0;
+  }
+
+  /**
+   * Handles orderly shutdown by requesting that each component in the
+   * pipeline abort its progress, waiting for each to exit and killing
+   * any jobs still running on the cluster.
+   */
+  class Shutdown extends Thread {
+
+    private void killComponent(Component<?> component) {
+      if (component == null) {
+        return;
+      }
+      component.abort();   // read no more tasks
+      try {
+        component.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted waiting for " + component);
+      }
+
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Exiting...");
+      try {
+        killComponent(factory);   // read no more tasks
+        killComponent(submitter); // submit no more tasks
+        killComponent(monitor);   // process remaining jobs in this thread
+      } finally {
+        if (monitor == null) {
+          return;
+        }
+        List<Job> remainingJobs = monitor.getRemainingJobs();
+        if (remainingJobs.isEmpty()) {
+          return;
+        }
+        LOG.info("Killing running jobs...");
+        for (Job job : remainingJobs) {
+          try {
+            if (!job.isComplete()) {
+              job.killJob();
+              LOG.info("Killed " + job.getJobName() + " (" + job.getID() + ")");
+            } else {
+              if (job.isSuccessful()) {
+                monitor.onSuccess(job);
+              } else {
+                monitor.onFailure(job);
+              }
+            }
+          } catch (IOException e) {
+            LOG.warn("Failure killing " + job.getJobName(), e);
+          } catch (Exception e) {
+            LOG.error("Unexcpected exception", e);
+          }
+        }
+        LOG.info("Done.");
+      }
+    }
+
+  }
+
+  public static void main(String[] argv) throws Exception {
+    int res = -1;
+    try {
+      res = ToolRunner.run(new Configuration(), new Gridmix(), argv);
+    } finally {
+      System.exit(res);
+    }
+  }
+
+  protected void printUsage(PrintStream out) {
+    ToolRunner.printGenericCommandUsage(out);
+    out.println("Usage: gridmix [-generate <MiB>] <iopath> <trace>");
+    out.println("  e.g. gridmix -generate 100m foo -");
+    out.println("Configuration parameters:");
+    out.printf("       %-40s : Output directory\n", GRIDMIX_OUT_DIR);
+    out.printf("       %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
+    out.printf("       %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
+    out.printf("       %-40s : Key size\n", GRIDMIX_KEY_LEN);
+  }
+
+  /**
+   * Components in the pipeline must support the following operations for
+   * orderly startup and shutdown.
+   */
+  interface Component<T> {
+
+    /**
+     * Accept an item into this component from an upstream component. If
+     * shutdown or abort have been called, this may fail, depending on the
+     * semantics for the component.
+     */
+    void add(T item) throws InterruptedException;
+
+    /**
+     * Attempt to start the service.
+     */
+    void start();
+
+    /**
+     * Wait until the service completes. It is assumed that either a
+     * {@link #shutdown} or {@link #abort} has been requested.
+     */
+    void join() throws InterruptedException;
+
+    /**
+     * Shut down gracefully, finishing all pending work. Reject new requests.
+     */
+    void shutdown();
+
+    /**
+     * Shut down immediately, aborting any work in progress and discarding
+     * all pending work. It is legal to store pending work for another
+     * thread to process.
+     */
+    void abort();
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Wed Sep 16 06:35:42 2009
@@ -0,0 +1,908 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Formatter;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Synthetic job generated from a trace description.
+ */
+class GridmixJob implements Callable<Job>, Delayed {
+
+  public static final String JOBNAME = "GRIDMIX";
+  public static final Log LOG = LogFactory.getLog(GridmixJob.class);
+
+  private static final ThreadLocal<Formatter> nameFormat =
+    new ThreadLocal<Formatter>() {
+      @Override
+      protected Formatter initialValue() {
+        final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5);
+        sb.append(JOBNAME);
+        return new Formatter(sb);
+      }
+    };
+
+  private final int seq;
+  private final Path outdir;
+  protected final Job job;
+  private final JobStory jobdesc;
+  private final long submissionTimeNanos;
+
+  public GridmixJob(Configuration conf, long submissionMillis,
+      JobStory jobdesc, Path outRoot, int seq) throws IOException {
+    ((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
+    job = new Job(conf, nameFormat.get().format("%05d", seq).toString());
+    submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+        submissionMillis, TimeUnit.MILLISECONDS);
+    this.jobdesc = jobdesc;
+    this.seq = seq;
+    outdir = new Path(outRoot, "" + seq);
+  }
+
+  protected GridmixJob(Configuration conf, long submissionMillis, String name)
+      throws IOException {
+    job = new Job(conf, name);
+    submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
+        submissionMillis, TimeUnit.MILLISECONDS);
+    jobdesc = null;
+    outdir = null;
+    seq = -1;
+  }
+
+  public String toString() {
+    return job.getJobName();
+  }
+
+  public long getDelay(TimeUnit unit) {
+    return unit.convert(submissionTimeNanos - System.nanoTime(),
+        TimeUnit.NANOSECONDS);
+  }
+
+  @Override
+  public int compareTo(Delayed other) {
+    if (this == other) {
+      return 0;
+    }
+    if (other instanceof GridmixJob) {
+      final long otherNanos = ((GridmixJob)other).submissionTimeNanos;
+      if (otherNanos < submissionTimeNanos) {
+        return 1;
+      }
+      if (otherNanos > submissionTimeNanos) {
+        return -1;
+      }
+      return id() - ((GridmixJob)other).id();
+    }
+    final long diff =
+      getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
+    return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    // not possible unless job is cloned; all jobs should be unique
+    return other instanceof GridmixJob && id() == ((GridmixJob)other).id();
+  }
+
+  @Override
+  public int hashCode() {
+    return id();
+  }
+
+  int id() {
+    return seq;
+  }
+
+  Job getJob() {
+    return job;
+  }
+
+  JobStory getJobDesc() {
+    return jobdesc;
+  }
+
+  public Job call() throws IOException, InterruptedException,
+                           ClassNotFoundException {
+    job.setMapperClass(GridmixMapper.class);
+    job.setReducerClass(GridmixReducer.class);
+    job.setNumReduceTasks(jobdesc.getNumberReduces());
+    job.setMapOutputKeyClass(GridmixKey.class);
+    job.setMapOutputValueClass(BytesWritable.class);
+    job.setSortComparatorClass(BytesWritable.Comparator.class);
+    job.setGroupingComparatorClass(SpecGroupingComparator.class);
+    job.setInputFormatClass(GridmixInputFormat.class);
+    job.setOutputFormatClass(RawBytesOutputFormat.class);
+    job.setPartitionerClass(DraftPartitioner.class);
+    job.setJarByClass(GridmixJob.class);
+    job.getConfiguration().setInt("gridmix.job.seq", seq);
+    FileInputFormat.addInputPath(job, new Path("ignored"));
+    FileOutputFormat.setOutputPath(job, outdir);
+    job.submit();
+    return job;
+  }
+
+  public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
+    public int getPartition(GridmixKey key, V value, int numReduceTasks) {
+      return key.getPartition();
+    }
+  }
+
+  /**
+   * Group REDUCE_SPEC records together
+   */
+  public static class SpecGroupingComparator
+      implements RawComparator<GridmixKey>, Serializable {
+    @Override
+    public int compare(GridmixKey g1, GridmixKey g2) {
+      final byte t1 = g1.getType();
+      final byte t2 = g2.getType();
+      if (t1 == GridmixKey.REDUCE_SPEC ||
+          t2 == GridmixKey.REDUCE_SPEC) {
+        return t1 - t2;
+      }
+      assert t1 == GridmixKey.DATA;
+      assert t2 == GridmixKey.DATA;
+      return WritableComparator.compareBytes(
+          g1.getBytes(), 0, g1.getLength(),
+          g2.getBytes(), 0, g2.getLength());
+    }
+    @Override
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      final byte t1 = b1[s1 + 4];
+      final byte t2 = b2[s2 + 4];
+      if (t1 == GridmixKey.REDUCE_SPEC ||
+          t2 == GridmixKey.REDUCE_SPEC) {
+        return t1 - t2;
+      }
+      assert t1 == GridmixKey.DATA;
+      assert t2 == GridmixKey.DATA;
+      return WritableComparator.compareBytes(
+          b1, s1 + 4, l1 - 4,
+          b2, s2 + 4, l2 - 4);
+    }
+  }
+
+  /**
+   * Keytype for synthetic jobs, some embedding instructions for the reduce.
+   */
+  public static class GridmixKey extends BytesWritable {
+    // long fields specifying reduce contract
+    private enum RSpec { REC_IN, REC_OUT, BYTES_OUT };
+    private static final int SPEC_START = 5; // type + partition len
+    private static final int NUMFIELDS = RSpec.values().length;
+    private static final int SPEC_SIZE = NUMFIELDS * 8;
+
+    // Key types
+    static final byte REDUCE_SPEC = 0;
+    static final byte DATA = 1;
+
+    private IntBuffer partition;
+    private LongBuffer spec;
+
+    public GridmixKey() {
+      super(new byte[SPEC_START]);
+    }
+
+    public GridmixKey(byte type, byte[] b) {
+      super(b);
+      setType(type);
+    }
+
+    public byte getType() {
+      return getBytes()[0];
+    }
+    public void setPartition(int partition) {
+      this.partition.put(0, partition);
+    }
+    public int getPartition() {
+      return partition.get(0);
+    }
+    public long getReduceInputRecords() {
+      checkState(REDUCE_SPEC);
+      return spec.get(RSpec.REC_IN.ordinal());
+    }
+    public long getReduceOutputBytes() {
+      checkState(REDUCE_SPEC);
+      return spec.get(RSpec.BYTES_OUT.ordinal());
+    }
+    public long getReduceOutputRecords() {
+      checkState(REDUCE_SPEC);
+      return spec.get(RSpec.REC_OUT.ordinal());
+    }
+    public void setType(byte b) {
+      switch (b) {
+        case REDUCE_SPEC:
+          if (getCapacity() < SPEC_START + SPEC_SIZE) {
+            setSize(SPEC_START + SPEC_SIZE);
+          }
+          spec =
+            ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer();
+          break;
+        case DATA:
+          if (getCapacity() < SPEC_START) {
+            setSize(SPEC_START);
+          }
+          spec = null;
+          break;
+        default:
+          throw new IllegalArgumentException("Illegal type " + b);
+      }
+      getBytes()[0] = b;
+      partition =
+        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
+    }
+    public void setReduceInputRecords(long records) {
+      checkState(REDUCE_SPEC);
+      spec.put(RSpec.REC_IN.ordinal(), records);
+    }
+    public void setReduceOutputBytes(long bytes) {
+      checkState(REDUCE_SPEC);
+      spec.put(RSpec.BYTES_OUT.ordinal(), bytes);
+    }
+    public void setReduceOutputRecords(long records) {
+      checkState(REDUCE_SPEC);
+      spec.put(RSpec.REC_OUT.ordinal(), records);
+    }
+    private void checkState(byte b) {
+      if (getLength() < SPEC_START || getType() != b) {
+        throw new IllegalStateException("Expected " + b + ", was " + getType());
+      }
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      if (getLength() < SPEC_START) {
+        throw new IOException("Invalid GridmixKey, len " + getLength());
+      }
+      partition =
+        ByteBuffer.wrap(getBytes(), 1, SPEC_START - 1).asIntBuffer();
+      spec = getType() == REDUCE_SPEC
+        ? ByteBuffer.wrap(getBytes(), SPEC_START, SPEC_SIZE).asLongBuffer()
+        : null;
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      if (getType() == REDUCE_SPEC) {
+        LOG.debug("SPEC(" + getPartition() + ") " + getReduceInputRecords() +
+            " -> " + getReduceOutputRecords() + "/" + getReduceOutputBytes());
+      }
+    }
+    @Override
+    public boolean equals(Object other) {
+      if (other instanceof GridmixKey) {
+        return super.equals(other);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+  }
+
+  public static class GridmixMapper
+      extends Mapper<IntWritable,BytesWritable,GridmixKey,BytesWritable> {
+
+    private final Random r = new Random();
+    private GridmixKey key;
+    private final BytesWritable val = new BytesWritable();
+
+    private int keyLen;
+    private double acc;
+    private double ratio;
+    private int[] reduceRecordSize;
+    private long[] reduceRecordCount;
+    private long[] reduceRecordRemaining;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      // TODO clearly job-specific, but no data at present
+      keyLen = context.getConfiguration().getInt(Gridmix.GRIDMIX_KEY_LEN, 20);
+      key = new GridmixKey(GridmixKey.DATA, new byte[keyLen]);
+      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
+      LOG.info("ID: " + split.getId());
+      reduceRecordCount = split.getOutputRecords();
+      reduceRecordRemaining =
+        Arrays.copyOf(reduceRecordCount, reduceRecordCount.length);
+      reduceRecordSize = new int[reduceRecordCount.length];
+      int valsize = -1;
+      final long[] reduceBytes = split.getOutputBytes();
+      long totalRecords = 0L;
+      for (int i = 0; i < reduceBytes.length; ++i) {
+        reduceRecordSize[i] = Math.max(0,
+          Math.round(reduceBytes[i] / (1.0f * reduceRecordCount[i])) - keyLen);
+        valsize = Math.max(reduceRecordSize[i], valsize);
+        totalRecords += reduceRecordCount[i];
+      }
+      valsize = Math.max(0, valsize - 4); // BW len encoding
+      val.setCapacity(valsize);
+      val.setSize(valsize);
+      ratio = totalRecords / (1.0 * split.getInputRecords());
+      acc = 0.0;
+    }
+
+    protected void fillBytes(BytesWritable val, int len) {
+      r.nextBytes(val.getBytes());
+      val.setSize(len);
+    }
+
+    /** Find next non-empty partition after start. */
+    private int getNextPart(final int start) {
+      int p = start;
+      do {
+        p = (p + 1) % reduceRecordSize.length;
+      } while (0 == reduceRecordRemaining[p] && p != start);
+      return 0 == reduceRecordRemaining[p] ? -1 : p;
+    }
+
+    @Override
+    public void map(IntWritable ignored, BytesWritable bytes,
+        Context context) throws IOException, InterruptedException {
+      int p = getNextPart(r.nextInt(reduceRecordSize.length));
+      if (-1 == p) {
+        return;
+      }
+      acc += ratio;
+      while (acc >= 1.0) {
+        fillBytes(key, key.getLength());
+        key.setType(GridmixKey.DATA);
+        key.setPartition(p);
+        --reduceRecordRemaining[p];
+        fillBytes(val, reduceRecordSize[p]);
+        context.write(key, val);
+        acc -= 1.0;
+        if (0 == reduceRecordRemaining[p] && -1 == (p = getNextPart(p))) {
+          return;
+        }
+      }
+    }
+
+    @Override
+    public void cleanup(Context context)
+        throws IOException, InterruptedException {
+      // output any remaining records
+      // TODO include reduce spec in remaining records if avail
+      //      (i.e. move this to map)
+      for (int i = 0; i < reduceRecordSize.length; ++i) {
+        for (long j = reduceRecordRemaining[i]; j > 0; --j) {
+          fillBytes(key, key.getLength());
+          key.setType(GridmixKey.DATA);
+          key.setPartition(i);
+          fillBytes(val, reduceRecordSize[i]);
+          context.write(key, val);
+        }
+      }
+      val.setSize(0);
+      key.setType(GridmixKey.REDUCE_SPEC);
+      final int reduces = context.getNumReduceTasks();
+      final GridmixSplit split = (GridmixSplit) context.getInputSplit();
+      final int maps = split.getMapCount();
+      int idx = 0;
+      int id = split.getId();
+      for (int i = 0; i < reduces; ++i) {
+        key.setPartition(i);
+        key.setReduceInputRecords(reduceRecordCount[i]);
+        // Write spec for all red st r_id % id == 0
+        if (i == id) {
+          key.setReduceOutputBytes(split.getReduceBytes(idx));
+          key.setReduceOutputRecords(split.getReduceRecords(idx));
+          LOG.debug(String.format("SPEC'D %d / %d to %d",
+                split.getReduceRecords(idx), split.getReduceBytes(idx), i));
+          ++idx;
+          id += maps;
+        } else {
+          key.setReduceOutputBytes(0);
+          key.setReduceOutputRecords(0);
+        }
+        context.write(key, val);
+      }
+    }
+  }
+
+  public static class GridmixReducer
+      extends Reducer<GridmixKey,BytesWritable,NullWritable,BytesWritable> {
+
+    private final Random r = new Random();
+    private final BytesWritable val = new BytesWritable();
+
+    private double acc;
+    private double ratio;
+    private long written;
+    private long inRecords = 0L;
+    private long outBytes = 0L;
+    private long outRecords = 0L;
+
+    protected void fillBytes(BytesWritable val, int len) {
+      r.nextBytes(val.getBytes());
+      val.setSize(len);
+    }
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      if (!context.nextKey() ||
+           context.getCurrentKey().getType() != GridmixKey.REDUCE_SPEC) {
+        throw new IOException("Missing reduce spec");
+      }
+      for (BytesWritable ignored : context.getValues()) {
+        final GridmixKey spec = context.getCurrentKey();
+        inRecords += spec.getReduceInputRecords();
+        LOG.debug("GOT COUNT " + spec.getReduceInputRecords());
+        outBytes += spec.getReduceOutputBytes();
+        outRecords += spec.getReduceOutputRecords();
+      }
+      LOG.debug("GOT SPEC " + outRecords + "/" + outBytes);
+      val.setCapacity(Math.round(outBytes / (1.0f * outRecords)));
+      ratio = outRecords / (1.0 * inRecords);
+      acc = 0.0;
+      LOG.debug(String.format("RECV %d -> %10d/%10d %d %f", inRecords,
+            outRecords, outBytes, val.getCapacity(), ratio));
+    }
+    @Override
+    protected void reduce(GridmixKey key, Iterable<BytesWritable> values,
+        Context context) throws IOException, InterruptedException {
+      for (BytesWritable ignored : values) {
+        acc += ratio;
+        while (acc >= 1.0 && written < outBytes) {
+          final int len = (int) Math.min(outBytes - written, val.getCapacity());
+          fillBytes(val, len);
+          context.write(NullWritable.get(), val);
+          acc -= 1.0;
+          written += len;
+          LOG.debug(String.format("%f %d/%d", acc, written, outBytes));
+        }
+      }
+    }
+  }
+
+  static class GridmixRecordReader
+      extends RecordReader<IntWritable,BytesWritable> {
+
+    private long bytesRead = 0;
+    private long bytesTotal;
+    private Configuration conf;
+    private final IntWritable key = new IntWritable();
+    private final BytesWritable inBytes = new BytesWritable();
+
+    private FSDataInputStream input;
+    private int idx = -1;
+    private int capacity;
+    private Path[] paths;
+    private long[] startoffset;
+    private long[] lengths;
+
+    public GridmixRecordReader() { }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext ctxt)
+            throws IOException, InterruptedException {
+      final GridmixSplit split = (GridmixSplit)genericSplit;
+      this.conf = ctxt.getConfiguration();
+      paths = split.getPaths();
+      startoffset = split.getStartOffsets();
+      lengths = split.getLengths();
+      bytesTotal = split.getLength();
+      capacity = (int) Math.round(bytesTotal / (1.0 * split.getInputRecords()));
+      inBytes.setCapacity(capacity);
+      nextSource();
+    }
+    private void nextSource() throws IOException {
+      idx = (idx + 1) % paths.length;
+      final Path file = paths[idx];
+      final FileSystem fs = file.getFileSystem(conf);
+      input = fs.open(file, capacity);
+      input.seek(startoffset[idx]);
+    }
+    @Override
+    public boolean nextKeyValue() throws IOException {
+      if (bytesRead >= bytesTotal) {
+        return false;
+      }
+      final int len = (int)
+        Math.min(bytesTotal - bytesRead, inBytes.getCapacity());
+      int kvread = 0;
+      while (kvread < len) {
+        assert lengths[idx] >= 0;
+        if (lengths[idx] <= 0) {
+          nextSource();
+          continue;
+        }
+        final int srcRead = (int) Math.min(len - kvread, lengths[idx]);
+        IOUtils.readFully(input, inBytes.getBytes(), kvread, srcRead);
+        //LOG.trace("Read " + srcRead + " bytes from " + paths[idx]);
+        lengths[idx] -= srcRead;
+        kvread += srcRead;
+      }
+      bytesRead += kvread;
+      return true;
+    }
+    @Override
+    public float getProgress() throws IOException {
+      return bytesRead / ((float)bytesTotal);
+    }
+    @Override
+    public IntWritable getCurrentKey() { return key; }
+    @Override
+    public BytesWritable getCurrentValue() { return inBytes; }
+    @Override
+    public void close() throws IOException {
+      IOUtils.cleanup(null, input);
+    }
+  }
+
+  static class GridmixSplit extends CombineFileSplit {
+    private int id;
+    private int nSpec;
+    private int maps;
+    private int reduces;
+    private long inputRecords;
+    private long outputBytes;
+    private long outputRecords;
+    private long maxMemory;
+    private double[] reduceBytes = new double[0];
+    private double[] reduceRecords = new double[0];
+
+    // Spec for reduces id mod this
+    private long[] reduceOutputBytes = new long[0];
+    private long[] reduceOutputRecords = new long[0];
+
+    GridmixSplit() {
+      super();
+    }
+
+    public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
+        long inputBytes, long inputRecords, long outputBytes,
+        long outputRecords, double[] reduceBytes, double[] reduceRecords,
+        long[] reduceOutputBytes, long[] reduceOutputRecords)
+        throws IOException {
+      super(cfsplit);
+      this.id = id;
+      this.maps = maps;
+      reduces = reduceBytes.length;
+      this.inputRecords = inputRecords;
+      this.outputBytes = outputBytes;
+      this.outputRecords = outputRecords;
+      this.reduceBytes = Arrays.copyOf(reduceBytes, reduces);
+      this.reduceRecords = Arrays.copyOf(reduceRecords, reduces);
+      nSpec = reduceOutputBytes.length;
+      this.reduceOutputBytes = reduceOutputBytes;
+      this.reduceOutputRecords = reduceOutputRecords;
+    }
+    public int getId() {
+      return id;
+    }
+    public int getMapCount() {
+      return maps;
+    }
+    public long getInputRecords() {
+      return inputRecords;
+    }
+    public long[] getOutputBytes() {
+      final long[] ret = new long[reduces];
+      for (int i = 0; i < reduces; ++i) {
+        ret[i] = Math.round(outputBytes * reduceBytes[i]);
+      }
+      return ret;
+    }
+    public long[] getOutputRecords() {
+      final long[] ret = new long[reduces];
+      for (int i = 0; i < reduces; ++i) {
+        ret[i] = Math.round(outputRecords * reduceRecords[i]);
+      }
+      return ret;
+    }
+    public long getReduceBytes(int i) {
+      return reduceOutputBytes[i];
+    }
+    public long getReduceRecords(int i) {
+      return reduceOutputRecords[i];
+    }
+    @Override
+    public void write(DataOutput out) throws IOException {
+      super.write(out);
+      WritableUtils.writeVInt(out, id);
+      WritableUtils.writeVInt(out, maps);
+      WritableUtils.writeVLong(out, inputRecords);
+      WritableUtils.writeVLong(out, outputBytes);
+      WritableUtils.writeVLong(out, outputRecords);
+      WritableUtils.writeVLong(out, maxMemory);
+      WritableUtils.writeVInt(out, reduces);
+      for (int i = 0; i < reduces; ++i) {
+        out.writeDouble(reduceBytes[i]);
+        out.writeDouble(reduceRecords[i]);
+      }
+      WritableUtils.writeVInt(out, nSpec);
+      for (int i = 0; i < nSpec; ++i) {
+        out.writeLong(reduceOutputBytes[i]);
+        out.writeLong(reduceOutputRecords[i]);
+      }
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      super.readFields(in);
+      id = WritableUtils.readVInt(in);
+      maps = WritableUtils.readVInt(in);
+      inputRecords = WritableUtils.readVLong(in);
+      outputBytes = WritableUtils.readVLong(in);
+      outputRecords = WritableUtils.readVLong(in);
+      maxMemory = WritableUtils.readVLong(in);
+      reduces = WritableUtils.readVInt(in);
+      if (reduceBytes.length < reduces) {
+        reduceBytes = new double[reduces];
+        reduceRecords = new double[reduces];
+      }
+      for (int i = 0; i < reduces; ++i) {
+        reduceBytes[i] = in.readDouble();
+        reduceRecords[i] = in.readDouble();
+      }
+      nSpec = WritableUtils.readVInt(in);
+      if (reduceOutputBytes.length < nSpec) {
+        reduceOutputBytes = new long[nSpec];
+        reduceOutputRecords = new long[nSpec];
+      }
+      for (int i = 0; i < nSpec; ++i) {
+        reduceOutputBytes[i] = in.readLong();
+        reduceOutputRecords[i] = in.readLong();
+      }
+    }
+  }
+
+  static class GridmixInputFormat
+      extends InputFormat<IntWritable,BytesWritable> {
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      return pullDescription(jobCtxt.getConfiguration().getInt(
+            "gridmix.job.seq", -1));
+    }
+    @Override
+    public RecordReader<IntWritable,BytesWritable> createRecordReader(
+        InputSplit split, final TaskAttemptContext taskContext)
+        throws IOException {
+      return new GridmixRecordReader();
+    }
+  }
+
+  static class RawBytesOutputFormat
+      extends FileOutputFormat<NullWritable,BytesWritable> {
+
+    @Override
+    public RecordWriter<NullWritable,BytesWritable> getRecordWriter(
+        TaskAttemptContext job) throws IOException {
+
+      Path file = getDefaultWorkFile(job, "");
+      FileSystem fs = file.getFileSystem(job.getConfiguration());
+      final FSDataOutputStream fileOut = fs.create(file, false);
+      return new RecordWriter<NullWritable,BytesWritable>() {
+        @Override
+        public void write(NullWritable key, BytesWritable value)
+            throws IOException {
+          //LOG.trace("WROTE " + value.getLength() + " bytes");
+          fileOut.write(value.getBytes(), 0, value.getLength());
+        }
+        @Override
+        public void close(TaskAttemptContext ctxt) throws IOException {
+          fileOut.close();
+        }
+      };
+    }
+  }
+
+  // TODO replace with ThreadLocal submitter?
+  private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
+    new ConcurrentHashMap<Integer,List<InputSplit>>();
+
+  static void pushDescription(int seq, List<InputSplit> splits) {
+    if (null != descCache.putIfAbsent(seq, splits)) {
+      throw new IllegalArgumentException("Description exists for id " + seq);
+    }
+  }
+
+  static List<InputSplit> pullDescription(int seq) {
+    return descCache.remove(seq);
+  }
+
+  // not nesc when TL
+  static void clearAll() {
+    descCache.clear();
+  }
+
+  void buildSplits(FilePool inputDir) throws IOException {
+    long mapInputBytesTotal = 0L;
+    long mapOutputBytesTotal = 0L;
+    long mapOutputRecordsTotal = 0L;
+    final JobStory jobdesc = getJobDesc();
+    if (null == jobdesc) {
+      return;
+    }
+    final int maps = jobdesc.getNumberMaps();
+    final int reds = jobdesc.getNumberReduces();
+    for (int i = 0; i < maps; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      mapInputBytesTotal += info.getInputBytes();
+      mapOutputBytesTotal += info.getOutputBytes();
+      mapOutputRecordsTotal += info.getOutputRecords();
+    }
+    final double[] reduceRecordRatio = new double[reds];
+    final double[] reduceByteRatio = new double[reds];
+    for (int i = 0; i < reds; ++i) {
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.REDUCE, i);
+      reduceByteRatio[i] = info.getInputBytes() / (1.0 * mapOutputBytesTotal);
+      reduceRecordRatio[i] =
+        info.getInputRecords() / (1.0 * mapOutputRecordsTotal);
+    }
+    final InputStriper striper = new InputStriper(inputDir, mapInputBytesTotal);
+    final List<InputSplit> splits = new ArrayList<InputSplit>();
+    for (int i = 0; i < maps; ++i) {
+      final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0);
+      final long[] specBytes = new long[nSpec];
+      final long[] specRecords = new long[nSpec];
+      for (int j = 0; j < nSpec; ++j) {
+        final TaskInfo info =
+          jobdesc.getTaskInfo(TaskType.REDUCE, i + j * maps);
+        specBytes[j] = info.getOutputBytes();
+        specRecords[j] = info.getOutputRecords();
+        LOG.debug(String.format("SPEC(%d) %d -> %d %d %d", id(), i,
+            i + j * maps, info.getOutputRecords(), info.getOutputBytes()));
+      }
+      final TaskInfo info = jobdesc.getTaskInfo(TaskType.MAP, i);
+      splits.add(new GridmixSplit(striper.splitFor(inputDir,
+              info.getInputBytes(), 3), maps, i,
+            info.getInputBytes(), info.getInputRecords(),
+            info.getOutputBytes(), info.getOutputRecords(),
+            reduceByteRatio, reduceRecordRatio, specBytes, specRecords));
+    }
+    pushDescription(id(), splits);
+  }
+
+  static class InputStriper {
+    int idx;
+    long currentStart;
+    FileStatus current;
+    final List<FileStatus> files = new ArrayList<FileStatus>();
+
+    InputStriper(FilePool inputDir, long mapBytes)
+        throws IOException {
+      final long inputBytes = inputDir.getInputFiles(mapBytes, files);
+      if (mapBytes > inputBytes) {
+        LOG.warn("Using " + inputBytes + "/" + mapBytes + " bytes");
+      }
+      current = files.get(0);
+    }
+
+    CombineFileSplit splitFor(FilePool inputDir, long bytes, int nLocs)
+        throws IOException {
+      final ArrayList<Path> paths = new ArrayList<Path>();
+      final ArrayList<Long> start = new ArrayList<Long>();
+      final ArrayList<Long> length = new ArrayList<Long>();
+      final HashMap<String,Double> sb = new HashMap<String,Double>();
+      while (bytes > 0) {
+        paths.add(current.getPath());
+        start.add(currentStart);
+        final long fromFile = Math.min(bytes, current.getLen() - currentStart);
+        length.add(fromFile);
+        for (BlockLocation loc :
+            inputDir.locationsFor(current, currentStart, fromFile)) {
+          final double tedium = loc.getLength() / (1.0 * bytes);
+          for (String l : loc.getHosts()) {
+            Double j = sb.get(l);
+            if (null == j) {
+              sb.put(l, tedium);
+            } else {
+              sb.put(l, j.doubleValue() + tedium);
+            }
+          }
+        }
+        currentStart += fromFile;
+        bytes -= fromFile;
+        if (current.getLen() - currentStart == 0) {
+          current = files.get(++idx % files.size());
+          currentStart = 0;
+        }
+      }
+      final ArrayList<Entry<String,Double>> sort =
+        new ArrayList<Entry<String,Double>>(sb.entrySet());
+      Collections.sort(sort, hostRank);
+      final String[] hosts = new String[Math.min(nLocs, sort.size())];
+      for (int i = 0; i < nLocs && i < sort.size(); ++i) {
+        hosts[i] = sort.get(i).getKey();
+      }
+      return new CombineFileSplit(paths.toArray(new Path[0]),
+          toLongArray(start), toLongArray(length), hosts);
+    }
+
+    private long[] toLongArray(final ArrayList<Long> sigh) {
+      final long[] ret = new long[sigh.size()];
+      for (int i = 0; i < ret.length; ++i) {
+        ret[i] = sigh.get(i);
+      }
+      return ret;
+    }
+
+    final Comparator<Entry<String,Double>> hostRank =
+      new Comparator<Entry<String,Double>>() {
+        public int compare(Entry<String,Double> a, Entry<String,Double> b) {
+            final double va = a.getValue();
+            final double vb = b.getValue();
+            return va > vb ? -1 : va < vb ? 1 : 0;
+          }
+      };
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java?rev=815628&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java Wed Sep 16 06:35:42 2009
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.tools.rumen.ZombieJobProducer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Component reading job traces generated by Rumen. Each job in the trace is
+ * assigned a sequence number and given a submission time relative to the
+ * job that preceded it. Jobs are enqueued in the JobSubmitter provided at
+ * construction.
+ * @see org.apache.hadoop.tools.rumen.HadoopLogsAnalyzer
+ */
+class JobFactory implements Gridmix.Component<Void> {
+
+  public static final Log LOG = LogFactory.getLog(JobFactory.class);
+
+  private final Path scratch;
+  private final Configuration conf;
+  private final ReaderThread rThread;
+  private final AtomicInteger sequence;
+  private final JobSubmitter submitter;
+  private final CountDownLatch startFlag;
+  private volatile IOException error = null;
+  protected final JobStoryProducer jobProducer;
+
+  /**
+   * Creating a new instance does not start the thread.
+   * @param submitter Component to which deserialized jobs are passed
+   * @param jobTrace Stream of job traces with which to construct a
+   *                 {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
+   * @param scratch Directory into which to write output from simulated jobs
+   * @param conf Config passed to all jobs to be submitted
+   * @param startFlag Latch released from main to start pipeline
+   */
+  public JobFactory(JobSubmitter submitter, InputStream jobTrace,
+      Path scratch, Configuration conf, CountDownLatch startFlag)
+      throws IOException {
+    this(submitter, new ZombieJobProducer(jobTrace, null), scratch, conf,
+        startFlag);
+  }
+
+  /**
+   * Constructor permitting JobStoryProducer to be mocked.
+   * @param submitter Component to which deserialized jobs are passed
+   * @param jobProducer Producer generating JobStory objects.
+   * @param scratch Directory into which to write output from simulated jobs
+   * @param conf Config passed to all jobs to be submitted
+   * @param startFlag Latch released from main to start pipeline
+   */
+  protected JobFactory(JobSubmitter submitter, JobStoryProducer jobProducer,
+      Path scratch, Configuration conf, CountDownLatch startFlag) {
+    sequence = new AtomicInteger(0);
+    this.scratch = scratch;
+    this.jobProducer = jobProducer;
+    this.conf = new Configuration(conf);
+    this.submitter = submitter;
+    this.startFlag = startFlag;
+    this.rThread = new ReaderThread();
+  }
+
+  /**
+   * Worker thread responsible for reading descriptions, assigning sequence
+   * numbers, and normalizing time.
+   */
+  private class ReaderThread extends Thread {
+
+    public ReaderThread() {
+      super("GridmixJobFactory");
+    }
+
+    private JobStory getNextJobFiltered() throws IOException {
+      JobStory job;
+      do {
+        job = jobProducer.getNextJob();
+      } while (job != null
+          && (job.getOutcome() != JobHistory.Values.SUCCESS ||
+              job.getSubmissionTime() < 0));
+      return job;
+    }
+
+    @Override
+    public void run() {
+      try {
+        startFlag.await();
+        if (Thread.currentThread().isInterrupted()) {
+          return;
+        }
+        final long initTime = TimeUnit.MILLISECONDS.convert(
+            System.nanoTime(), TimeUnit.NANOSECONDS);
+        LOG.debug("START @ " + initTime);
+        long first = -1;
+        long last = -1;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            final JobStory job = getNextJobFiltered();
+            if (null == job) {
+              return;
+            }
+            if (first < 0) {
+              first = job.getSubmissionTime();
+            }
+            final long current = job.getSubmissionTime();
+            if (current < last) {
+              throw new IOException(
+                  "JobStories are not ordered by submission time.");
+            }
+            last = current;
+            submitter.add(new GridmixJob(conf, initTime + (current - first),
+                job, scratch, sequence.getAndIncrement()));
+          } catch (IOException e) {
+            JobFactory.this.error = e;
+            return;
+          }
+        }
+      } catch (InterruptedException e) {
+        // exit thread; ignore any jobs remaining in the trace
+        return;
+      } finally {
+        IOUtils.cleanup(null, jobProducer);
+      }
+    }
+  }
+
+  /**
+   * Obtain the error that caused the thread to exit unexpectedly.
+   */
+  public IOException error() {
+    return error;
+  }
+
+  /**
+   * Add is disabled.
+   * @throws UnsupportedOperationException
+   */
+  public void add(Void ignored) {
+    throw new UnsupportedOperationException(getClass().getName() +
+        " is at the start of the pipeline and accepts no events");
+  }
+
+  /**
+   * Start the reader thread, wait for latch if necessary.
+   */
+  public void start() {
+    rThread.start();
+  }
+
+  /**
+   * Wait for the reader thread to exhaust the job trace.
+   */
+  public void join() throws InterruptedException {
+    rThread.join();
+  }
+
+  /**
+   * Interrupt the reader thread.
+   */
+  public void shutdown() {
+    rThread.interrupt();
+  }
+
+  /**
+   * Interrupt the reader thread. This requires no special consideration, as
+   * the thread has no pending work queue.
+   */
+  public void abort() {
+    // Currently no special work
+    rThread.interrupt();
+  }
+
+}



Mime
View raw message