hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r816831 [1/2] - in /hadoop/mapreduce/trunk: ./ src/examples/org/apache/hadoop/examples/terasort/ src/examples/org/apache/hadoop/examples/terasort/2009-write-up/ src/java/org/apache/hadoop/mapred/ src/test/
Date Sat, 19 Sep 2009 00:26:09 GMT
Author: omalley
Date: Sat Sep 19 00:26:07 2009
New Revision: 816831

URL: http://svn.apache.org/viewvc?rev=816831&view=rev
Log:
MAPREDUCE-639. Change Terasort example to reflect the 2009 updates. 
(omalley)

Added:
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/.gitignore
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/100TBTaskTime.png   (with props)
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1PBTaskTime.png   (with props)
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1TBTaskTime.png   (with props)
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/500GBTaskTime.png   (with props)
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/Yahoo2009.tex
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/tera.bib
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/GenSort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Random16.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraScheduler.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Unsigned16.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileSplit.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Sep 19 00:26:07 2009
@@ -408,6 +408,9 @@
     MAPREDUCE-954. Change Map-Reduce context objects to be interfaces.
     (acmurthy) 
 
+    MAPREDUCE-639. Change Terasort example to reflect the 2009 updates. 
+    (omalley)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/.gitignore?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/.gitignore (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/.gitignore Sat Sep 19 00:26:07 2009
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Yahoo2009.aux
+Yahoo2009.bbl
+Yahoo2009.blg
+Yahoo2009.log
+Yahoo2009.out
+Yahoo2009.pdf

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/100TBTaskTime.png
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/100TBTaskTime.png?rev=816831&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/100TBTaskTime.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1PBTaskTime.png
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1PBTaskTime.png?rev=816831&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1PBTaskTime.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1TBTaskTime.png
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1TBTaskTime.png?rev=816831&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/1TBTaskTime.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/500GBTaskTime.png
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/500GBTaskTime.png?rev=816831&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/500GBTaskTime.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/Yahoo2009.tex
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/Yahoo2009.tex?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/Yahoo2009.tex (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/Yahoo2009.tex Sat Sep 19 00:26:07 2009
@@ -0,0 +1,370 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%     http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+\documentclass{article}
+\usepackage[pdftex]{hyperref}
+\usepackage[pdftex]{graphicx}
+
+\title{Winning a 60 Second Dash with a Yellow Elephant}
+\author{\href{http://people.apache.org/~omalley}{Owen O'Malley} and 
+        \href{http://people.apache.org/~acmurthy}{Arun C. Murthy}\\
+\href{http://www.yahoo.com/}{Yahoo!}\\
+owen@yahoo-inc.com and acm@yahoo-inc.com}
+\date{April 2009}
+\begin{document}
+\maketitle
+\href{http://hadoop.apache.org/core}{Apache Hadoop} is a open source
+software framework that dramatically simplifies writing distributed
+data intensive applications. It provides a distributed file system,
+which is modeled after the Google File System\cite{gfs}, and a
+map/reduce\cite{mapreduce} implementation that manages distributed
+computation. Jim Gray defined a benchmark to compare large sorting
+programs. Since the core of map/reduce is a distributed sort, most of
+the custom code is glue to get the desired behavior.
+
+\section{Benchmark Rules}
+
+Jim's Gray's sort benchmark consists of a set of many related
+benchmarks, each with their own rules. All of the sort benchmarks
+measure the time to sort different numbers of 100 byte records. The
+first 10 bytes of each record is the key and the rest is the
+value. The \textbf{minute sort} must finish end to end in less than a
+minute. The \textbf{Gray sort} must sort more than 100 terabytes and
+must run for at least an hour.
+
+\begin{itemize}
+\item The input data must precisely match the data generated by the C
+  data generator.
+\item The input must not be in the operating system's file
+  cache when the job starts.. Under Linux, this requires using the memory for something
+  else between sorting runs.
+\item The input and output data must not be compressed.
+\item The output must not overwrite the input.
+\item The output must be synced to disk.
+\item The 128 bit sum of the crc32's of each key/value pair must be
+  calculated for the input and output. Naturally, they must be
+  identical.
+\item The output may be divided into multiple output files, but it
+  must be totally ordered (simply concatenating the output files must
+  produce the completely sorted output).
+\item Starting and distributing the application to the cluster must be
+  included in the execution time.
+\item Any sampling must be included in the execution time.
+\end{itemize}
+
+\section{Hadoop implementation}
+
+We extended the programs from last year to create and manipulate the
+new binary format and match the new rules. There are now 4 Hadoop
+map/reduce applications to support the benchmark:
+\begin{enumerate}
+\item \textbf{TeraGen} is a map/reduce program to generate the data.
+\item \textbf{TeraSort} samples the input data and uses map/reduce to
+  sort the data into a total order.
+\item \textbf{TeraSum} is a map/reduce program computes the 128 bit
+  sum of the crc32 of each key/value pair.
+\item \textbf{TeraValidate} is a map/reduce program that validates the
+  output is sorted and computes the sum of the checksums as TeraSum.
+\end{enumerate}
+The update to the terasort programs will be checked in as
+\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716}.
+
+\textbf{TeraGen} generates input data for the sort that is byte for byte
+equivalent to the C version that was released in March of 2009,
+including specific keys and values. It divides the desired number of
+rows by the desired number of tasks and assigns ranges of rows to each
+map. The map jumps the random number generator to the correct value
+for the first row and generates the following rows.
+
+\textbf{TeraSort} is a standard map/reduce sort, except for a custom
+partitioner that ensures that all of the keys in reduce $N$ are after
+all of the keys in reduce $N-1$. This is a requirement of the contest
+so that the output of the sort is totally ordered, even if it is
+divided up by reduce.
+
+We wrote an input and output format, used by all 4 applications to
+read and write the files in the new format.
+
+\textbf{TeraSum} computes the 128 bit sum of the CRC32 of each
+key/value pair. Each map computes the sum of its input and emits a
+single 128 bit sum. There is a single reduce that adds the sums from
+each map. We used this program on the input directory to calculate the
+sum of the checksums of each key/value pair to check the correctness
+of the output of the sort. We also used TeraSum on a distinct dataset
+that was larger than the total RAM in the cluster to flush the Linux
+file cache between runs of the small (500 GB and 1TB) sorts.
+
+\textbf{TeraValidate} ensures that the output is globally sorted. It
+creates one map per file in the output directory and each map
+ensures that each key is less than or equal to the previous one. The
+map also generates records with the first and last keys of the file
+and the reduce ensures that the first key of file $i$ is greater that
+the last key of file $i-1$. Any problems are reported as output of the
+reduce with the keys that are out of order. Additionally, TeraValidate
+calculates the sum of checksums of the output directory.
+
+\section{Hardware and Operating System}
+
+We ran our benchmarks on Yahoo's Hammer cluster. Hammer's hardware is
+very similar to the hardware that we used in last year's terabyte
+sort. The hardware and operating system details are:
+
+\begin{itemize}
+\item approximately 3800 nodes (in such a large cluster, nodes are
+  always down)
+\item 2 quad core Xeons @ 2.5ghz per node
+\item 4 SATA disks per node
+\item 8G RAM per node (upgraded to 16GB before the petabyte sort)
+\item 1 gigabit ethernet on each node
+\item 40 nodes per rack
+\item 8 gigabit ethernet uplinks from each rack to the core
+\item Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+\item Sun Java JDK (1.6.0\_05-b13 and 1.6.0\_13-b03) (32 and 64 bit)
+\end{itemize}
+
+We hit a JVM bug in 1.6.0\_05-b13 on the larger sorts (100TB and 1PB)
+and switched over to the later JVM, which resolved the issue. For the
+larger sorts, we used 64 bit JVMs for the Name Node and Job Tracker.
+
+\section{Software and Configuration}
+
+The version of Hadoop we used was a private branch of trunk that was
+started in January 2009, which is after the 0.20 branch was feature
+frozen. We used git to manage our branch and it allowed us to easily
+coordinate our work, track our changes, and resynchronize with the
+current Hadoop trunk.
+
+The changes include:
+
+\begin{enumerate}
+
+\item Updated the terasort example in the Hadoop code base to match
+  the dataset defined by the rule changes in the benchmark from March
+  of 2009.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5716}{HADOOP-5716})
+
+\item We reimplemented the reducer side of Hadoop's shuffle. The
+  redesign improved the performance of the shuffle and removed
+  bottlenecks and over-throttling. It also made the code more
+  maintainable and understandable by breaking a 3000 line Java file
+  into multiple classes with a clean set of interfaces.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5223}{HADOOP-5223})
+
+\item The new shuffle also fetches multiple map outputs from the same
+  node over each connection rather than one at a time. Fetching
+  multiple map outputs at the same time avoids connection setup costs
+  and also avoids the round trip while the server responds to the request.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-1338}{HADOOP-1338})
+  
+\item Allowed configuring timeouts on the shuffle connections and we
+  shortened them for the small sorts. We observed cases where the
+  connections for the shuffle would hang until the timeout, which made
+  low latency jobs impossibly long.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5789}{HADOOP-5789})
+
+\item Set TCP no-delay and more frequent pings between the Task and
+  the Task Tracker to reduce latency in detecting problems.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5788}{HADOOP-5788})
+
+\item We added some protection code to detect incorrect data being
+  transmitted in the shuffle from causing the reduce to fail. It
+  appears this is either a JVM NIO bug or Jetty bug that likely
+  affects 0.20 and trunk under heavy load.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5783}{HADOOP-5783})
+
+\item We used LZO compression on the map outputs. On the new dataset, LZO
+  compresses down to 45\% of the original size. By comparison, the
+  dataset from last year compresses to 20\% of the original size. Last
+  year, the shuffle would run out of direct buffers if we used
+  compression on the map outputs.
+
+\item We implemented memory to memory merges in the reduce during the
+  shuffle to combine the map outputs in memory before we finish the
+  shuffle, thereby reducing the work needed when the reduce is
+  running.
+
+\item We multi-threaded the sampling code that read the input set to
+  find the partition points between the reduces. We also wrote a
+  simple partitioner that assumes the keys are evenly
+  distributed. Since the new dataset does not require sampling, the
+  simple partitioner produces very even partitions.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-4946}{HADOOP-4946})
+
+\item On the smaller clusters, we configured the system with faster
+  heartbeat cycles from the Task Trackers to the Job Tracker (it
+  defaults to 10 secs / 1000 nodes, but we made it configurable and
+  brought it down to 2 seconds/1000 nodes to provide lower latency)
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5784}{HADOOP-5784})
+
+\item Typically the Job Tracker assigns tasks to Task Trackers on a
+  first come first served basis. This greedy assignment of tasks did
+  not lead to good data locality. However, by taking a global view and
+  placing all of the map tasks at once, the system achieves much better
+  locality. Rather than implement global scheduling for all of Hadoop,
+  which would be much harder, we implemented a global scheduler for
+  the terasort example in the input format. Basically, the input
+  format computes the splits and assigns work to the nodes that have
+  the fewest blocks first. For a node that has more blocks
+  than map slots, it picks the block that have the fewest remaining
+  locations left. This greedy global algorithm seems to get very good
+  locality. The input format would schedule the maps and then change
+  the input split descriptions to only have a single location instead
+  of the original 3. This increased task locality by 40\% or so over
+  the greedy scheduler.
+
+\item Hadoop 0.20 added setup and cleanup tasks. Since they are not
+  required for the sort benchmarks, we allow them to be disabled to
+  reduce the latency of starting and stopping the job.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5785}{HADOOP-5785})
+
+\item We discovered a performance problem where in some contexts the
+  cost of using the JNI-based CRC32 was very high. By implementing it
+  in pure Java, the average case is a little slower, but the worst
+  case is much better.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5598}{HADOOP-5598})
+
+\item We found and removed some hard-coded wait loops from the
+  framework that don't matter for large jobs, but can seriously slow
+  down low latency jobs.
+
+\item Allowed setting the logging level for the tasks, so that we
+  could cut down on logging. When running for "real" we configure the
+  logging level to WARN instead of the default INFO. Reducing the
+  amount of logging has a huge impact on the performance of the
+  system, but obviously makes debugging and analysis much harder.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5786}{HADOOP-5786})
+
+\item One optimization that we didn't finish is to optimize the job
+  planning code. Currently, it uses an RPC to the Name Node for each
+  input file, which we have observed taking a substantial amount of
+  time. For the terabyte sort, our investigations show that we
+  could save about 4 seconds out of the 8 that were spent on setting
+  up the job.
+  (\href{http://issues.apache.org/jira/browse/HADOOP-5795}{HADOOP-5795})
+
+\end{enumerate}
+
+\section{Results}
+
+Hadoop has made a lot of progress in the last year and we were able to
+run much lower latency jobs as well as much larger jobs. Note that in
+any large cluster and distributed application, there are a lot of
+moving pieces and thus we have seen a wide variation in execution
+times. As Hadoop evolves and becomes more graceful in the presence of
+hardware degradation and failure, this variation should smooth
+out. The best times for each of the listed sort sizes were:
+\\
+
+\begin{tabular}{| c | c | c | c | c | c |}
+\hline
+Bytes & Nodes & Maps & Reduces & Replication & Time \\
+\hline
+$5*10^{11}$ & 1406 & 8000 & 2600 & 1 & 59 seconds \\
+$10^{12}$ & 1460 & 8000 & 2700 & 1 & 62 seconds \\
+$10^{14}$ & 3452 & 190,000 & 10,000 & 2 & 173 minutes \\
+$10^{15}$ & 3658 & 80,000 & 20,000 & 2 & 975 minutes \\
+\hline
+\end{tabular}\\
+
+Within the rules for the 2009 Gray sort, our 500 GB sort set a new
+record for the minute sort and the 1PB sort set a new record of 1.03
+TB/minute. The 62 second terabyte sort would have set a new record,
+but the terabyte benchmark that we won last year has been
+retired. (Clearly the minute sort and terabyte sort are rapidly
+converging, and thus it is not a loss.)  One piece of trivia is that
+only the petabyte dataset had any duplicate keys (40 of them).
+
+Because the smaller sorts needed lower latency and faster network, we
+only used part of the cluster for those runs. In particular, instead
+of our normal 5:1 over subscription between racks, we limited it to 16
+nodes in each rack for a 2:1 over subscription. The smaller runs can
+also use output replication of 1, because they only take minutes to
+run and run on smaller clusters, the likelihood of a node failing is
+fairly low. On the larger runs, failure is expected and thus
+replication of 2 is required. HDFS protects against data loss during
+rack failure by writing the second replica on a different rack and
+thus writing the second replica is relatively slow.
+
+We've included the timelines for the jobs counting from the job
+submission at the Job Tracker. The diagrams show the number of tasks
+running at each point in time. While maps only have a single phase,
+the reduces have three: \textbf{shuffle}, \textbf{merge}, and
+\textbf{reduce}. The shuffle is the transfer of the data from the
+maps. Merge doesn't happen in these benchmarks, because none of the
+reduces need multiple levels of merges. Finally, the reduce phase is
+where the final merge and writing to HDFS happens. I've also included
+a category named \textbf{waste} that represents task attempts that
+were running, but ended up either failing, or being killed (often as
+speculatively executed task attempts). The job logs and configuration
+for the four runs, which are the raw data for the charts, are
+available on
+\href{http://people.apache.org/~omalley/tera-2009/}{http://people.apache.org/~omalley/tera-2009/}.
+
+If you compare this years charts to last year's, you'll notice that
+tasks are launching much faster now. Last year we only launched one
+task per heartbeat, so it took 40 seconds to get all of the tasks
+launched. Now, Hadoop will fill up a Task Tracker in a single
+heartbeat. Reducing that job launch overhead is very important
+for getting runs under a minute.
+
+As with last year, we ran with significantly larger tasks than the
+defaults for Hadoop. Even with the new more aggressive shuffle,
+minimizing the number of transfers (maps * reduces) is very important
+to the performance of the job. Notice that in the petabyte sort, each
+map is processing 15 GB instead of the default 128 MB and each reduce
+is handling 50 GB. When we ran the petabyte with more typical values
+1.5 GB / map, it took 40 hours to finish. Therefore, to increase
+throughput, it makes sense to consider increasing the default block
+size, which translates into the default map size, to at least up to 1
+GB.
+
+\section{Comments on the Rule Changes}
+
+The group that runs the Gray Sort Benchmark made very substantial
+changes to the rules this year. The changes were not announced; but
+rather appeared on the website in March. We feel that it was too late
+to make rule changes and that the benchmark should have been changed
+next year. We'd also like to point out that while most of the changes to
+the data generator were positive, it was a poor choice to remove the
+skewed distribution of the keys. The previously skewed distribution
+required sampling of the input to pick good partition points between
+the reduces. The current dataset picks keys so completely random that
+sampling is counter productive and yields even less distributions between the
+reduces.
+
+\bibliographystyle{abbrv}
+\bibliography{tera}
+
+\begin{figure}[!p]
+\includegraphics[width=4.21in]{500GBTaskTime.png}
+\caption{500 GB sort tasks across time}\label{500GbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1TBTaskTime.png}
+\caption{1 TB sort tasks across time}\label{1TbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{100TBTaskTime.png}
+\caption{100 TB sort tasks across time}\label{100TbTimeline}
+\end{figure} 
+
+\begin{figure}[!p]
+\includegraphics[width=4.5in]{1PBTaskTime.png}
+\caption{1 PB sort tasks across time}\label{1PbTimeline}
+\end{figure} 
+
+\end{document}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/tera.bib
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/tera.bib?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/tera.bib (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/2009-write-up/tera.bib Sat Sep 19 00:26:07 2009
@@ -0,0 +1,31 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%     http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+@INPROCEEDINGS{mapreduce,
+	AUTHOR = "Jeffery Dean and Sanjay Ghemawat",
+	TITLE = "MapReduce: Simplified Data Processing on Large Clusters",
+	BOOKTITLE = "Sixth Symposium on Operating System Design and Implementation",
+	MONTH = "December", 
+        ADDRESS = "San Francisco, CA",
+	YEAR = {2004}	}
+
+@INPROCEEDINGS{gfs,
+	AUTHOR = "Sanjay Ghemawat and Howard Gobioff and Shun-Tak Leung",
+	TITLE = "The Google File System",
+	BOOKTITLE = "19th Symposium on Operating Systems Principles",
+        ORGANIZATION = "ACM",
+	MONTH = "October", 
+        ADDRESS = "Lake George, NY",
+	YEAR = {2003}	}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/GenSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/GenSort.java?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/GenSort.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/GenSort.java Sat Sep 19 00:26:07 2009
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.terasort;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.util.PureJavaCrc32;
+
+/** 
+ * A single process data generator for the terasort data. Based on gensort.c 
+ * version 1.1 (3 Mar 2009) from Chris Nyberg <chris.nyberg@ordinal.com>.
+ */
+public class GenSort {
+
+  /**
+   * Generate a "binary" record suitable for all sort benchmarks *except* 
+   * PennySort.
+   */
+  static void generateRecord(byte[] recBuf, Unsigned16 rand, 
+                                     Unsigned16 recordNumber) {
+    /* generate the 10-byte key using the high 10 bytes of the 128-bit
+     * random number
+     */
+    for(int i=0; i < 10; ++i) {
+      recBuf[i] = rand.getByte(i);
+    }
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = 0x00;
+    recBuf[11] = 0x11;
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[44] = (byte) 0x88;
+    recBuf[45] = (byte) 0x99;
+    recBuf[46] = (byte) 0xAA;
+    recBuf[47] = (byte) 0xBB;
+
+    /* add 48 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 12; ++i) {
+      recBuf[48+i*4] = recBuf[49+i*4] = recBuf[50+i*4] = recBuf[51+i*4] =
+        (byte) rand.getHexDigit(20 + i);
+    }
+
+    /* add 4 bytes of "break" data */
+    recBuf[96] = (byte) 0xCC;
+    recBuf[97] = (byte) 0xDD;
+    recBuf[98] = (byte) 0xEE;
+    recBuf[99] = (byte) 0xFF;
+  }
+
+
+  private static BigInteger makeBigInteger(long x) {
+    byte[] data = new byte[8];
+    for(int i=0; i < 8; ++i) {
+      data[i] = (byte) (x >>> (56 - 8*i));
+    }
+    return new BigInteger(1, data);
+  }
+
+  private static final BigInteger NINETY_FIVE = new BigInteger("95");
+
+  /**
+   * Generate an ascii record suitable for all sort benchmarks including 
+   * PennySort.
+   */
+  static void generateAsciiRecord(byte[] recBuf, Unsigned16 rand, 
+                                  Unsigned16 recordNumber) {
+
+    /* generate the 10-byte ascii key using mostly the high 64 bits.
+     */
+    long temp = rand.getHigh8();
+    if (temp < 0) {
+      // use biginteger to avoid the negative sign problem
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[0] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();
+    } else {
+      recBuf[0] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    for(int i=1; i < 8; ++i) {
+      recBuf[i] = (byte) (' ' + (temp % 95));
+      temp /= 95;      
+    }
+    temp = rand.getLow8();
+    if (temp < 0) {
+      BigInteger bigTemp = makeBigInteger(temp);
+      recBuf[8] = (byte) (' ' + (bigTemp.mod(NINETY_FIVE).longValue()));
+      temp = bigTemp.divide(NINETY_FIVE).longValue();      
+    } else {
+      recBuf[8] = (byte) (' ' + (temp % 95));
+      temp /= 95;
+    }
+    recBuf[9] = (byte)(' ' + (temp % 95));
+
+    /* add 2 bytes of "break" */
+    recBuf[10] = ' ';
+    recBuf[11] = ' ';
+
+    /* convert the 128-bit record number to 32 bits of ascii hexadecimal
+     * as the next 32 bytes of the record.
+     */
+    for (int i = 0; i < 32; i++) {
+      recBuf[12 + i] = (byte) recordNumber.getHexDigit(i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[44] = ' ';
+    recBuf[45] = ' ';
+
+    /* add 52 bytes of filler based on low 48 bits of random number */
+    for(int i=0; i < 13; ++i) {
+      recBuf[46+i*4] = recBuf[47+i*4] = recBuf[48+i*4] = recBuf[49+i*4] =
+        (byte) rand.getHexDigit(19 + i);
+    }
+
+    /* add 2 bytes of "break" data */
+    recBuf[98] = '\r';	/* nice for Windows */
+    recBuf[99] = '\n';
+}
+
+
+  private static void usage() {
+    PrintStream out = System.out;
+    out.println("usage: gensort [-a] [-c] [-bSTARTING_REC_NUM] NUM_RECS FILE_NAME");
+    out.println("-a        Generate ascii records required for PennySort or JouleSort.");
+    out.println("          These records are also an alternative input for the other");
+    out.println("          sort benchmarks.  Without this flag, binary records will be");
+    out.println("          generated that contain the highest density of randomness in");
+    out.println("          the 10-byte key.");
+    out.println( "-c        Calculate the sum of the crc32 checksums of each of the");
+    out.println("          generated records and send it to standard error.");
+    out.println("-bN       Set the beginning record generated to N. By default the");
+    out.println("          first record generated is record 0.");
+    out.println("NUM_RECS  The number of sequential records to generate.");
+    out.println("FILE_NAME The name of the file to write the records to.\n");
+    out.println("Example 1 - to generate 1000000 ascii records starting at record 0 to");
+    out.println("the file named \"pennyinput\":");
+    out.println("    gensort -a 1000000 pennyinput\n");
+    out.println("Example 2 - to generate 1000 binary records beginning with record 2000");
+    out.println("to the file named \"partition2\":");
+    out.println("    gensort -b2000 1000 partition2");
+    System.exit(1);
+  }
+
+
+  public static void outputRecords(OutputStream out,
+                                   boolean useAscii,
+                                   Unsigned16 firstRecordNumber,
+                                   Unsigned16 recordsToGenerate,
+                                   Unsigned16 checksum
+                                   ) throws IOException {
+    byte[] row = new byte[100];
+    Unsigned16 recordNumber = new Unsigned16(firstRecordNumber);
+    Unsigned16 lastRecordNumber = new Unsigned16(firstRecordNumber);
+    Checksum crc = new PureJavaCrc32();
+    Unsigned16 tmp = new Unsigned16();
+    lastRecordNumber.add(recordsToGenerate);
+    Unsigned16 ONE = new Unsigned16(1);
+    Unsigned16 rand = Random16.skipAhead(firstRecordNumber);
+    while (!recordNumber.equals(lastRecordNumber)) {
+      Random16.nextRand(rand);
+      if (useAscii) {
+        generateAsciiRecord(row, rand, recordNumber);
+      } else {
+        generateRecord(row, rand, recordNumber);
+      }
+      if (checksum != null) {
+        crc.reset();
+        crc.update(row, 0, row.length);
+        tmp.set(crc.getValue());
+        checksum.add(tmp);
+      }
+      recordNumber.add(ONE);
+      out.write(row);
+    }
+  }
+                                   
+  public static void main(String[] args) throws Exception {
+    Unsigned16 startingRecord = new Unsigned16();
+    Unsigned16 numberOfRecords;
+    OutputStream out;
+    boolean useAscii = false;
+    Unsigned16 checksum = null;
+
+    int i;
+    for(i=0; i < args.length; ++i) {
+      String arg = args[i];
+      int argLength = arg.length();
+      if (argLength >= 1 && arg.charAt(0) == '-') {
+        if (argLength < 2) {
+          usage();
+        }
+        switch (arg.charAt(1)) {
+        case 'a':
+          useAscii = true;
+          break;
+        case 'b':
+          startingRecord = Unsigned16.fromDecimal(arg.substring(2));
+          break;
+        case 'c':
+          checksum = new Unsigned16();
+          break;
+        default:
+          usage();
+        }
+      } else {
+        break;
+      }
+    }
+    if (args.length - i != 2) {
+      usage();
+    }
+    numberOfRecords = Unsigned16.fromDecimal(args[i]);
+    out = new FileOutputStream(args[i+1]);
+
+    outputRecords(out, useAscii, startingRecord, numberOfRecords, checksum);
+    out.close();
+    if (checksum != null) {
+      System.out.println(checksum);
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Random16.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Random16.java?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Random16.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/Random16.java Sat Sep 19 00:26:07 2009
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.terasort;
+
+/**
+ * This class implements a 128-bit linear congruential generator.
+ * Specifically, if X0 is the most recently issued 128-bit random
+ * number (or a seed of 0 if no random number has already been generated,
+ * the next number to be generated, X1, is equal to:
+ * X1 = (a * X0 + c) mod 2**128
+ * where a is 47026247687942121848144207491837523525
+ *            or 0x2360ed051fc65da44385df649fccf645
+ *   and c is 98910279301475397889117759788405497857
+ *            or 0x4a696d47726179524950202020202001
+ * The coefficient "a" is suggested by:
+ * Pierre L'Ecuyer, "Tables of linear congruential generators of different
+ * sizes and good lattice structure", Mathematics of Computation, 68
+ * pp. 249 - 260 (1999)
+ * http://www.ams.org/mcom/1999-68-225/S0025-5718-99-00996-5/S0025-5718-99-00996-5.pdf
+ * The constant "c" meets the simple suggestion by the same reference that
+ * it be odd.
+ *
+ * There is also a facility for quickly advancing the state of the
+ * generator by a fixed number of steps - this facilitates parallel
+ * generation.
+ *
+ * This is based on 1.0 of rand16.c from Chris Nyberg 
+ * <chris.nyberg@ordinal.com>.
+ */
+class Random16 {
+
+  /** 
+   * The "Gen" array contain powers of 2 of the linear congruential generator.
+   * The index 0 struct contain the "a" coefficient and "c" constant for the
+   * generator.  That is, the generator is:
+   *    f(x) = (Gen[0].a * x + Gen[0].c) mod 2**128
+   *
+   * All structs after the first contain an "a" and "c" that
+   * comprise the square of the previous function.
+   *
+   * f**2(x) = (Gen[1].a * x + Gen[1].c) mod 2**128
+   * f**4(x) = (Gen[2].a * x + Gen[2].c) mod 2**128
+   * f**8(x) = (Gen[3].a * x + Gen[3].c) mod 2**128
+   * ...
+
+   */
+  private static class RandomConstant {
+    final Unsigned16 a;
+    final Unsigned16 c;
+    public RandomConstant(String left, String right) {
+      a = new Unsigned16(left);
+      c = new Unsigned16(right);
+    }
+  }
+
+  private static final RandomConstant[] genArray = new RandomConstant[]{
+    /* [  0] */ new RandomConstant("2360ed051fc65da44385df649fccf645", 
+                                   "4a696d47726179524950202020202001"),
+    /* [  1] */ new RandomConstant("17bce35bdf69743c529ed9eb20e0ae99", 
+                                   "95e0e48262b3edfe04479485c755b646"),
+    /* [  2] */ new RandomConstant("f4dd417327db7a9bd194dfbe42d45771", 
+                                   "882a02c315362b60765f100068b33a1c"),
+    /* [  3] */ new RandomConstant("6347af777a7898f6d1a2d6f33505ffe1", 
+                                   "5efc4abfaca23e8ca8edb1f2dfbf6478"),
+    /* [  4] */ new RandomConstant("b6a4239f3b315f84f6ef6d3d288c03c1", 
+                                   "f25bd15439d16af594c1b1bafa6239f0"),
+    /* [  5] */ new RandomConstant("2c82901ad1cb0cd182b631ba6b261781", 
+                                   "89ca67c29c9397d59c612596145db7e0"),
+    /* [  6] */ new RandomConstant("dab03f988288676ee49e66c4d2746f01", 
+                                   "8b6ae036713bd578a8093c8eae5c7fc0"),
+    /* [  7] */ new RandomConstant("602167331d86cf5684fe009a6d09de01", 
+                                   "98a2542fd23d0dbdff3b886cdb1d3f80"),
+    /* [  8] */ new RandomConstant("61ecb5c24d95b058f04c80a23697bc01", 
+                                   "954db923fdb7933e947cd1edcecb7f00"),
+    /* [  9] */ new RandomConstant("4a5c31e0654c28aa60474e83bf3f7801", 
+                                   "00be4a36657c98cd204e8c8af7dafe00"),
+    /* [ 10] */ new RandomConstant("ae4f079d54fbece1478331d3c6bef001", 
+                                   "991965329dccb28d581199ab18c5fc00"),
+    /* [ 11] */ new RandomConstant("101b8cb830c7cb927ff1ed50ae7de001", 
+                                   "e1a8705b63ad5b8cd6c3d268d5cbf800"),
+    /* [ 12] */ new RandomConstant("f54a27fc056b00e7563f3505e0fbc001", 
+                                   "2b657bbfd6ed9d632079e70c3c97f000"),
+    /* [ 13] */ new RandomConstant("df8a6fc1a833d201f98d719dd1f78001",
+                                   "59b60ee4c52fa49e9fe90682bd2fe000"),
+    /* [ 14] */ new RandomConstant("5480a5015f101a4ea7e3f183e3ef0001", 
+                                   "cc099c88030679464fe86aae8a5fc000"),
+    /* [ 15] */ new RandomConstant("a498509e76e5d7925f539c28c7de0001", 
+                                   "06b9abff9f9f33dd30362c0154bf8000"),
+    /* [ 16] */ new RandomConstant("0798a3d8b10dc72e60121cd58fbc0001", 
+                                   "e296707121688d5a0260b293a97f0000"),
+    /* [ 17] */ new RandomConstant("1647d1e78ec02e665fafcbbb1f780001", 
+                                   "189ffc4701ff23cb8f8acf6b52fe0000"),
+    /* [ 18] */ new RandomConstant("a7c982285e72bf8c0c8ddfb63ef00001", 
+                                   "5141110ab208fb9d61fb47e6a5fc0000"),
+    /* [ 19] */ new RandomConstant("3eb78ee8fb8c56dbc5d4e06c7de00001", 
+                                   "3c97caa62540f2948d8d340d4bf80000"),
+    /* [ 20] */ new RandomConstant("72d03b6f4681f2f9fe8e44d8fbc00001", 
+                                   "1b25cb9cfe5a0c963174f91a97f00000"),
+    /* [ 21] */ new RandomConstant("ea85f81e4f502c9bc8ae99b1f7800001", 
+                                   "0c644570b4a487103c5436352fe00000"),
+    /* [ 22] */ new RandomConstant("629c320db08b00c6bfa57363ef000001", 
+                                   "3d0589c28869472bde517c6a5fc00000"),
+    /* [ 23] */ new RandomConstant("c5c4b9ce268d074a386be6c7de000001", 
+                                   "bc95e5ab36477e65534738d4bf800000"),
+    /* [ 24] */ new RandomConstant("f30bbbbed1596187555bcd8fbc000001", 
+                                   "ddb02ff72a031c01011f71a97f000000"),
+    /* [ 25] */ new RandomConstant("4a1000fb26c9eeda3cc79b1f78000001", 
+                                   "2561426086d9acdb6c82e352fe000000"),
+    /* [ 26] */ new RandomConstant("89fb5307f6bf8ce2c1cf363ef0000001", 
+                                   "64a788e3c118ed1c8215c6a5fc000000"),
+    /* [ 27] */ new RandomConstant("830b7b3358a5d67ea49e6c7de0000001", 
+                                   "e65ea321908627cfa86b8d4bf8000000"),
+    /* [ 28] */ new RandomConstant("fd8a51da91a69fe1cd3cd8fbc0000001", 
+                                   "53d27225604d85f9e1d71a97f0000000"),
+    /* [ 29] */ new RandomConstant("901a48b642b90b55aa79b1f780000001", 
+                                   "ca5ec7a3ed1fe55e07ae352fe0000000"),
+    /* [ 30] */ new RandomConstant("118cdefdf32144f394f363ef00000001", 
+                                   "4daebb2e085330651f5c6a5fc0000000"),
+    /* [ 31] */ new RandomConstant("0a88c0a91cff430829e6c7de00000001", 
+                                   "9d6f1a00a8f3f76e7eb8d4bf80000000"),
+    /* [ 32] */ new RandomConstant("433bef4314f16a9453cd8fbc00000001", 
+                                   "158c62f2b31e496dfd71a97f00000000"),
+    /* [ 33] */ new RandomConstant("c294b02995ae6738a79b1f7800000001", 
+                                   "290e84a2eb15fd1ffae352fe00000000"),
+    /* [ 34] */ new RandomConstant("913575e0da8b16b14f363ef000000001", 
+                                   "e3dc1bfbe991a34ff5c6a5fc00000000"),
+    /* [ 35] */ new RandomConstant("2f61b9f871cf4e629e6c7de000000001", 
+                                   "ddf540d020b9eadfeb8d4bf800000000"),
+    /* [ 36] */ new RandomConstant("78d26ccbd68320c53cd8fbc000000001", 
+                                   "8ee4950177ce66bfd71a97f000000000"),
+    /* [ 37] */ new RandomConstant("8b7ebd037898518a79b1f78000000001", 
+                                   "39e0f787c907117fae352fe000000000"),
+    /* [ 38] */ new RandomConstant("0b5507b61f78e314f363ef0000000001", 
+                                   "659d2522f7b732ff5c6a5fc000000000"),
+    /* [ 39] */ new RandomConstant("4f884628f812c629e6c7de0000000001", 
+                                   "9e8722938612a5feb8d4bf8000000000"),
+    /* [ 40] */ new RandomConstant("be896744d4a98c53cd8fbc0000000001", 
+                                   "e941a65d66b64bfd71a97f0000000000"),
+    /* [ 41] */ new RandomConstant("daf63a553b6318a79b1f780000000001", 
+                                   "7b50d19437b097fae352fe0000000000"),
+    /* [ 42] */ new RandomConstant("2d7a23d8bf06314f363ef00000000001", 
+                                   "59d7b68e18712ff5c6a5fc0000000000"),
+    /* [ 43] */ new RandomConstant("392b046a9f0c629e6c7de00000000001", 
+                                   "4087bab2d5225feb8d4bf80000000000"),
+    /* [ 44] */ new RandomConstant("eb30fbb9c218c53cd8fbc00000000001", 
+                                   "b470abc03b44bfd71a97f00000000000"),
+    /* [ 45] */ new RandomConstant("b9cdc30594318a79b1f7800000000001", 
+                                   "366630eaba897fae352fe00000000000"),
+    /* [ 46] */ new RandomConstant("014ab453686314f363ef000000000001", 
+                                   "a2dfc77e8512ff5c6a5fc00000000000"),
+    /* [ 47] */ new RandomConstant("395221c7d0c629e6c7de000000000001", 
+                                   "1e0d25a14a25feb8d4bf800000000000"),
+    /* [ 48] */ new RandomConstant("4d972813a18c53cd8fbc000000000001", 
+                                   "9d50a5d3944bfd71a97f000000000000"),
+    /* [ 49] */ new RandomConstant("06f9e2374318a79b1f78000000000001", 
+                                   "bf7ab5eb2897fae352fe000000000000"),
+    /* [ 50] */ new RandomConstant("bd220cae86314f363ef0000000000001", 
+                                   "925b14e6512ff5c6a5fc000000000000"),
+    /* [ 51] */ new RandomConstant("36fd3a5d0c629e6c7de0000000000001", 
+                                   "724cce0ca25feb8d4bf8000000000000"),
+    /* [ 52] */ new RandomConstant("60def8ba18c53cd8fbc0000000000001", 
+                                   "1af42d1944bfd71a97f0000000000000"),
+    /* [ 53] */ new RandomConstant("8d500174318a79b1f780000000000001", 
+                                   "0f529e32897fae352fe0000000000000"),
+    /* [ 54] */ new RandomConstant("48e842e86314f363ef00000000000001", 
+                                   "844e4c6512ff5c6a5fc0000000000000"),
+    /* [ 55] */ new RandomConstant("4af185d0c629e6c7de00000000000001",
+                                   "9f40d8ca25feb8d4bf80000000000000"),
+    /* [ 56] */ new RandomConstant("7a670ba18c53cd8fbc00000000000001",
+                                   "9912b1944bfd71a97f00000000000000"),
+    /* [ 57] */ new RandomConstant("86de174318a79b1f7800000000000001",
+                                   "9c69632897fae352fe00000000000000"),
+    /* [ 58] */ new RandomConstant("55fc2e86314f363ef000000000000001",
+                                   "e1e2c6512ff5c6a5fc00000000000000"),
+    /* [ 59] */ new RandomConstant("ccf85d0c629e6c7de000000000000001",
+                                   "68058ca25feb8d4bf800000000000000"),
+    /* [ 60] */ new RandomConstant("1df0ba18c53cd8fbc000000000000001",
+                                   "610b1944bfd71a97f000000000000000"),
+    /* [ 61] */ new RandomConstant("4be174318a79b1f78000000000000001",
+                                   "061632897fae352fe000000000000000"),
+    /* [ 62] */ new RandomConstant("d7c2e86314f363ef0000000000000001",
+                                   "1c2c6512ff5c6a5fc000000000000000"),
+    /* [ 63] */ new RandomConstant("af85d0c629e6c7de0000000000000001",
+                                   "7858ca25feb8d4bf8000000000000000"),
+    /* [ 64] */ new RandomConstant("5f0ba18c53cd8fbc0000000000000001",
+                                   "f0b1944bfd71a97f0000000000000000"),
+    /* [ 65] */ new RandomConstant("be174318a79b1f780000000000000001",
+                                   "e1632897fae352fe0000000000000000"),
+    /* [ 66] */ new RandomConstant("7c2e86314f363ef00000000000000001",
+                                   "c2c6512ff5c6a5fc0000000000000000"),
+    /* [ 67] */ new RandomConstant("f85d0c629e6c7de00000000000000001",
+                                   "858ca25feb8d4bf80000000000000000"),
+    /* [ 68] */ new RandomConstant("f0ba18c53cd8fbc00000000000000001",
+                                   "0b1944bfd71a97f00000000000000000"),
+    /* [ 69] */ new RandomConstant("e174318a79b1f7800000000000000001",
+                                   "1632897fae352fe00000000000000000"),
+    /* [ 70] */ new RandomConstant("c2e86314f363ef000000000000000001",
+                                   "2c6512ff5c6a5fc00000000000000000"),
+    /* [ 71] */ new RandomConstant("85d0c629e6c7de000000000000000001",
+                                   "58ca25feb8d4bf800000000000000000"),
+    /* [ 72] */ new RandomConstant("0ba18c53cd8fbc000000000000000001",
+                                   "b1944bfd71a97f000000000000000000"),
+    /* [ 73] */ new RandomConstant("174318a79b1f78000000000000000001",
+                                   "632897fae352fe000000000000000000"),
+    /* [ 74] */ new RandomConstant("2e86314f363ef0000000000000000001",
+                                   "c6512ff5c6a5fc000000000000000000"),
+    /* [ 75] */ new RandomConstant("5d0c629e6c7de0000000000000000001",
+                                   "8ca25feb8d4bf8000000000000000000"),
+    /* [ 76] */ new RandomConstant("ba18c53cd8fbc0000000000000000001",
+                                   "1944bfd71a97f0000000000000000000"),
+    /* [ 77] */ new RandomConstant("74318a79b1f780000000000000000001",
+                                   "32897fae352fe0000000000000000000"),
+    /* [ 78] */ new RandomConstant("e86314f363ef00000000000000000001",
+                                   "6512ff5c6a5fc0000000000000000000"),
+    /* [ 79] */ new RandomConstant("d0c629e6c7de00000000000000000001",
+                                   "ca25feb8d4bf80000000000000000000"),
+    /* [ 80] */ new RandomConstant("a18c53cd8fbc00000000000000000001",
+                                   "944bfd71a97f00000000000000000000"),
+    /* [ 81] */ new RandomConstant("4318a79b1f7800000000000000000001",
+                                   "2897fae352fe00000000000000000000"),
+    /* [ 82] */ new RandomConstant("86314f363ef000000000000000000001",
+                                   "512ff5c6a5fc00000000000000000000"),
+    /* [ 83] */ new RandomConstant("0c629e6c7de000000000000000000001",
+                                   "a25feb8d4bf800000000000000000000"),
+    /* [ 84] */ new RandomConstant("18c53cd8fbc000000000000000000001",
+                                   "44bfd71a97f000000000000000000000"),
+    /* [ 85] */ new RandomConstant("318a79b1f78000000000000000000001",
+                                   "897fae352fe000000000000000000000"),
+    /* [ 86] */ new RandomConstant("6314f363ef0000000000000000000001",
+                                   "12ff5c6a5fc000000000000000000000"),
+    /* [ 87] */ new RandomConstant("c629e6c7de0000000000000000000001",
+                                   "25feb8d4bf8000000000000000000000"),
+    /* [ 88] */ new RandomConstant("8c53cd8fbc0000000000000000000001",
+                                   "4bfd71a97f0000000000000000000000"),
+    /* [ 89] */ new RandomConstant("18a79b1f780000000000000000000001",
+                                   "97fae352fe0000000000000000000000"),
+    /* [ 90] */ new RandomConstant("314f363ef00000000000000000000001",
+                                   "2ff5c6a5fc0000000000000000000000"),
+    /* [ 91] */ new RandomConstant("629e6c7de00000000000000000000001",
+                                   "5feb8d4bf80000000000000000000000"),
+    /* [ 92] */ new RandomConstant("c53cd8fbc00000000000000000000001",
+                                   "bfd71a97f00000000000000000000000"),
+    /* [ 93] */ new RandomConstant("8a79b1f7800000000000000000000001",
+                                   "7fae352fe00000000000000000000000"),
+    /* [ 94] */ new RandomConstant("14f363ef000000000000000000000001",
+                                   "ff5c6a5fc00000000000000000000000"),
+    /* [ 95] */ new RandomConstant("29e6c7de000000000000000000000001",
+                                   "feb8d4bf800000000000000000000000"),
+    /* [ 96] */ new RandomConstant("53cd8fbc000000000000000000000001",
+                                   "fd71a97f000000000000000000000000"),
+    /* [ 97] */ new RandomConstant("a79b1f78000000000000000000000001",
+                                   "fae352fe000000000000000000000000"),
+    /* [ 98] */ new RandomConstant("4f363ef0000000000000000000000001",
+                                   "f5c6a5fc000000000000000000000000"),
+    /* [ 99] */ new RandomConstant("9e6c7de0000000000000000000000001",
+                                   "eb8d4bf8000000000000000000000000"),
+    /* [100] */ new RandomConstant("3cd8fbc0000000000000000000000001",
+                                   "d71a97f0000000000000000000000000"),
+    /* [101] */ new RandomConstant("79b1f780000000000000000000000001",
+                                   "ae352fe0000000000000000000000000"),
+    /* [102] */ new RandomConstant("f363ef00000000000000000000000001",
+                                   "5c6a5fc0000000000000000000000000"),
+    /* [103] */ new RandomConstant("e6c7de00000000000000000000000001",
+                                   "b8d4bf80000000000000000000000000"),
+    /* [104] */ new RandomConstant("cd8fbc00000000000000000000000001",
+                                   "71a97f00000000000000000000000000"),
+    /* [105] */ new RandomConstant("9b1f7800000000000000000000000001",
+                                   "e352fe00000000000000000000000000"),
+    /* [106] */ new RandomConstant("363ef000000000000000000000000001",
+                                   "c6a5fc00000000000000000000000000"),
+    /* [107] */ new RandomConstant("6c7de000000000000000000000000001",
+                                   "8d4bf800000000000000000000000000"),
+    /* [108] */ new RandomConstant("d8fbc000000000000000000000000001",
+                                   "1a97f000000000000000000000000000"),
+    /* [109] */ new RandomConstant("b1f78000000000000000000000000001",
+                                   "352fe000000000000000000000000000"),
+    /* [110] */ new RandomConstant("63ef0000000000000000000000000001",
+                                   "6a5fc000000000000000000000000000"),
+    /* [111] */ new RandomConstant("c7de0000000000000000000000000001",
+                                   "d4bf8000000000000000000000000000"),
+    /* [112] */ new RandomConstant("8fbc0000000000000000000000000001",
+                                   "a97f0000000000000000000000000000"),
+    /* [113] */ new RandomConstant("1f780000000000000000000000000001",
+                                   "52fe0000000000000000000000000000"),
+    /* [114] */ new RandomConstant("3ef00000000000000000000000000001",
+                                   "a5fc0000000000000000000000000000"),
+    /* [115] */ new RandomConstant("7de00000000000000000000000000001",
+                                   "4bf80000000000000000000000000000"),
+    /* [116] */ new RandomConstant("fbc00000000000000000000000000001",
+                                   "97f00000000000000000000000000000"),
+    /* [117] */ new RandomConstant("f7800000000000000000000000000001",
+                                   "2fe00000000000000000000000000000"),
+    /* [118] */ new RandomConstant("ef000000000000000000000000000001",
+                                   "5fc00000000000000000000000000000"),
+    /* [119] */ new RandomConstant("de000000000000000000000000000001",
+                                   "bf800000000000000000000000000000"),
+    /* [120] */ new RandomConstant("bc000000000000000000000000000001",
+                                   "7f000000000000000000000000000000"),
+    /* [121] */ new RandomConstant("78000000000000000000000000000001",
+                                   "fe000000000000000000000000000000"),
+    /* [122] */ new RandomConstant("f0000000000000000000000000000001",
+                                   "fc000000000000000000000000000000"),
+    /* [123] */ new RandomConstant("e0000000000000000000000000000001",
+                                   "f8000000000000000000000000000000"),
+    /* [124] */ new RandomConstant("c0000000000000000000000000000001",
+                                   "f0000000000000000000000000000000"),
+    /* [125] */ new RandomConstant("80000000000000000000000000000001",
+                                   "e0000000000000000000000000000000"),
+    /* [126] */ new RandomConstant("00000000000000000000000000000001",
+                                   "c0000000000000000000000000000000"),
+    /* [127] */ new RandomConstant("00000000000000000000000000000001",
+                                   "80000000000000000000000000000000")};
+
+  /**
+   *  generate the random number that is "advance" steps
+   *  from an initial random number of 0.  This is done by
+   *  starting with 0, and then advancing the by the
+   *  appropriate powers of 2 of the linear congruential
+   *  generator.
+   */
+  public static Unsigned16 skipAhead(Unsigned16 advance) {
+    Unsigned16 result = new Unsigned16();
+    long          bit_map;
+
+    bit_map = advance.getLow8();
+    for (int i = 0; bit_map != 0 && i < 64; i++) {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**i) (x)
+         */
+        result.multiply(genArray[i].a);
+        result.add(genArray[i].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    bit_map = advance.getHigh8();
+    for (int i = 0; bit_map != 0 && i < 64; i++)
+    {
+      if ((bit_map & (1L << i)) != 0) {
+        /* advance random number by f**(2**(i + 64)) (x)
+         */
+        result.multiply(genArray[i+64].a);
+        result.add(genArray[i+64].c);
+        bit_map &= ~(1L << i);
+      }
+    }
+    return result;
+  }
+
+  /** 
+   * Generate the next 16 byte random number.
+   */
+  public static void nextRand(Unsigned16 rand) {
+    /* advance the random number forward once using the linear congruential
+     * generator, and then return the new random number
+     */
+    rand.multiply(genArray[0].a);
+    rand.add(genArray[0].c);
+  }
+}

Added: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java?rev=816831&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java (added)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraChecksum.java Sat Sep 19 00:26:07 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class TeraChecksum extends Configured implements Tool {
+  static class ChecksumMapper extends MapReduceBase 
+         implements Mapper<Text,Text,NullWritable,Unsigned16> {
+    private OutputCollector<NullWritable,Unsigned16> output;
+    private Unsigned16 checksum = new Unsigned16();
+    private Unsigned16 sum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+
+    public void map(Text key, Text value, 
+                    OutputCollector<NullWritable,Unsigned16> output,
+                    Reporter reporter) throws IOException {
+      if (this.output == null) {
+        this.output = output;
+      }
+      crc32.reset();
+      crc32.update(key.getBytes(), 0, key.getLength());
+      crc32.update(value.getBytes(), 0, value.getLength());
+      checksum.set(crc32.getValue());
+      sum.add(checksum);
+    }
+
+    public void close() throws IOException {
+      if (output != null) {
+        output.collect(NullWritable.get(), sum);
+      }
+    }
+  }
+
+  static class ChecksumReducer extends MapReduceBase 
+         implements Reducer<NullWritable,Unsigned16,NullWritable,Unsigned16> {
+    public void reduce(NullWritable key, Iterator<Unsigned16> values,
+                       OutputCollector<NullWritable, Unsigned16> output, 
+                       Reporter reporter) throws IOException {
+      Unsigned16 sum = new Unsigned16();
+      while (values.hasNext()) {
+        sum.add(values.next());
+      }
+      output.collect(key, sum);
+    }
+  }
+
+  private static void usage() throws IOException {
+    System.err.println("terasum <out-dir> <report-dir>");
+  }
+
+  public int run(String[] args) throws Exception {
+    JobConf job = (JobConf) getConf();
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
+    TeraInputFormat.setInputPaths(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    job.setJobName("TeraSum");
+    job.setJarByClass(TeraChecksum.class);
+    job.setMapperClass(ChecksumMapper.class);
+    job.setReducerClass(ChecksumReducer.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(Unsigned16.class);
+    // force a single reducer
+    job.setNumReduceTasks(1);
+    job.setInputFormat(TeraInputFormat.class);
+    JobClient.runJob(job);
+    return 0;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new JobConf(), new TeraChecksum(), args);
+    System.exit(res);
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java Sat Sep 19 00:26:07 2009
@@ -21,7 +21,10 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.zip.Checksum;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -38,19 +41,20 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
- * Generate the official terasort input data set.
+ * Generate the official GraySort input data set.
  * The user specifies the number of rows and the output directory and this
  * class runs a map/reduce program to generate the data.
  * The format of the data is:
  * <ul>
- * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
- * <li>The keys are random characters from the set ' ' .. '~'.
- * <li>The rowid is the right justified row id as a int.
- * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * <li>(10 bytes key) (constant 2 bytes) (32 bytes rowid) 
+ *     (constant 4 bytes) (48 bytes filler) (constant 4 bytes)
+ * <li>The rowid is the right justified row id as a hex number.
  * </ul>
  *
  * <p>
@@ -58,6 +62,9 @@
  * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
  */
 public class TeraGen extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(TeraSort.class);
+
+  public static enum Counters {CHECKSUM}
 
   public static String NUM_ROWS = "mapreduce.terasort.num-rows";
   /**
@@ -160,17 +167,14 @@
     public InputSplit[] getSplits(JobConf job, 
                                   int numSplits) {
       long totalRows = getNumberOfRows(job);
-      long rowsPerSplit = totalRows / numSplits;
-      System.out.println("Generating " + totalRows + " using " + numSplits + 
-                         " maps with step of " + rowsPerSplit);
+      LOG.info("Generating " + totalRows + " using " + numSplits);
       InputSplit[] splits = new InputSplit[numSplits];
       long currentRow = 0;
-      for(int split=0; split < numSplits-1; ++split) {
-        splits[split] = new RangeInputSplit(currentRow, rowsPerSplit);
-        currentRow += rowsPerSplit;
+      for(int split=0; split < numSplits; ++split) {
+        long goal = (long) Math.ceil(totalRows * (double)(split+1) / numSplits);
+        splits[split] = new RangeInputSplit(currentRow, goal - currentRow);
+        currentRow = goal;
       }
-      splits[numSplits-1] = new RangeInputSplit(currentRow, 
-                                                totalRows - currentRow);
       return splits;
     }
 
@@ -184,74 +188,6 @@
     job.setLong(NUM_ROWS, numRows);
   }
 
-  static class RandomGenerator {
-    private long seed = 0;
-    private static final long mask32 = (1l<<32) - 1;
-    /**
-     * The number of iterations separating the precomputed seeds.
-     */
-    private static final int seedSkip = 128 * 1024 * 1024;
-    /**
-     * The precomputed seed values after every seedSkip iterations.
-     * There should be enough values so that a 2**32 iterations are 
-     * covered.
-     */
-    private static final long[] seeds = new long[]{0L,
-                                                   4160749568L,
-                                                   4026531840L,
-                                                   3892314112L,
-                                                   3758096384L,
-                                                   3623878656L,
-                                                   3489660928L,
-                                                   3355443200L,
-                                                   3221225472L,
-                                                   3087007744L,
-                                                   2952790016L,
-                                                   2818572288L,
-                                                   2684354560L,
-                                                   2550136832L,
-                                                   2415919104L,
-                                                   2281701376L,
-                                                   2147483648L,
-                                                   2013265920L,
-                                                   1879048192L,
-                                                   1744830464L,
-                                                   1610612736L,
-                                                   1476395008L,
-                                                   1342177280L,
-                                                   1207959552L,
-                                                   1073741824L,
-                                                   939524096L,
-                                                   805306368L,
-                                                   671088640L,
-                                                   536870912L,
-                                                   402653184L,
-                                                   268435456L,
-                                                   134217728L,
-                                                  };
-
-    /**
-     * Start the random number generator on the given iteration.
-     * @param initalIteration the iteration number to start on
-     */
-    RandomGenerator(long initalIteration) {
-      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
-      seed = seeds[baseIndex];
-      for(int i=0; i < initalIteration % seedSkip; ++i) {
-        next();
-      }
-    }
-
-    RandomGenerator() {
-      this(0);
-    }
-
-    long next() {
-      seed = (seed * 3141592621l + 663896637) & mask32;
-      return seed;
-    }
-  }
-
   /**
    * The Mapper class that given a row number, will generate the appropriate 
    * output line.
@@ -261,78 +197,75 @@
 
     private Text key = new Text();
     private Text value = new Text();
-    private RandomGenerator rand;
-    private byte[] keyBytes = new byte[12];
-    private byte[] spaces = "          ".getBytes();
-    private byte[][] filler = new byte[26][];
-    {
-      for(int i=0; i < 26; ++i) {
-        filler[i] = new byte[10];
-        for(int j=0; j<10; ++j) {
-          filler[i][j] = (byte) ('A' + i);
-        }
-      }
-    }
-    
-    /**
-     * Add a random key to the text
-     * @param rowId
-     */
-    private void addKey() {
-      for(int i=0; i<3; i++) {
-        long temp = rand.next() / 52;
-        keyBytes[3 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[2 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[1 + 4*i] = (byte) (' ' + (temp % 95));
-        temp /= 95;
-        keyBytes[4*i] = (byte) (' ' + (temp % 95));
-      }
-      key.set(keyBytes, 0, 10);
-    }
-    
-    /**
-     * Add the rowid to the row.
-     * @param rowId
-     */
-    private void addRowId(long rowId) {
-      byte[] rowid = Integer.toString((int) rowId).getBytes();
-      int padSpace = 10 - rowid.length;
-      if (padSpace > 0) {
-        value.append(spaces, 0, 10 - rowid.length);
-      }
-      value.append(rowid, 0, Math.min(rowid.length, 10));
-    }
-
-    /**
-     * Add the required filler bytes. Each row consists of 7 blocks of
-     * 10 characters and 1 block of 8 characters.
-     * @param rowId the current row number
-     */
-    private void addFiller(long rowId) {
-      int base = (int) ((rowId * 8) % 26);
-      for(int i=0; i<7; ++i) {
-        value.append(filler[(base+i) % 26], 0, 10);
-      }
-      value.append(filler[(base+7) % 26], 0, 8);
-    }
+    private Unsigned16 rand = null;
+    private Unsigned16 rowId = null;
+    private Unsigned16 checksum = new Unsigned16();
+    private Checksum crc32 = new PureJavaCrc32();
+    private Unsigned16 total = new Unsigned16();
+    private static final Unsigned16 ONE = new Unsigned16(1);
+    private byte[] buffer = new byte[TeraInputFormat.KEY_LENGTH +
+                                     TeraInputFormat.VALUE_LENGTH];
+    private Counter checksumCounter;
 
     public void map(LongWritable row, NullWritable ignored,
                     OutputCollector<Text, Text> output,
                     Reporter reporter) throws IOException {
-      long rowId = row.get();
       if (rand == null) {
-        // we use 3 random numbers per a row
-        rand = new RandomGenerator(rowId*3);
-      }
-      addKey();
-      value.clear();
-      addRowId(rowId);
-      addFiller(rowId);
+        rowId = new Unsigned16(row.get());
+        rand = Random16.skipAhead(rowId);
+        checksumCounter = reporter.getCounter(Counters.CHECKSUM);
+      }
+      Random16.nextRand(rand);
+      GenSort.generateRecord(buffer, rand, rowId);
+      key.set(buffer, 0, TeraInputFormat.KEY_LENGTH);
+      value.set(buffer, TeraInputFormat.KEY_LENGTH, 
+                TeraInputFormat.VALUE_LENGTH);
       output.collect(key, value);
+      crc32.reset();
+      crc32.update(buffer, 0, 
+                   TeraInputFormat.KEY_LENGTH + TeraInputFormat.VALUE_LENGTH);
+      checksum.set(crc32.getValue());
+      total.add(checksum);
+      rowId.add(ONE);
+    }
+
+    @Override
+    public void close() {
+      checksumCounter.increment(total.getLow8());
     }
+  }
 
+  private static void usage() throws IOException {
+    System.err.println("teragen <num rows> <output dir>");
+  }
+
+  /**
+   * Parse a number that optionally has a postfix that denotes a base.
+   * @param str an string integer with an option base {k,m,b,t}.
+   * @return the expanded value
+   */
+  private static long parseHumanLong(String str) {
+    char tail = str.charAt(str.length() - 1);
+    long base = 1;
+    switch (tail) {
+    case 't':
+      base *= 1000 * 1000 * 1000 * 1000;
+      break;
+    case 'b':
+      base *= 1000 * 1000 * 1000;
+      break;
+    case 'm':
+      base *= 1000 * 1000;
+      break;
+    case 'k':
+      base *= 1000;
+      break;
+    default:
+    }
+    if (base != 1) {
+      str = str.substring(0, str.length() - 1);
+    }
+    return Long.parseLong(str) * base;
   }
   
   /**
@@ -340,8 +273,17 @@
    */
   public int run(String[] args) throws IOException {
     JobConf job = (JobConf) getConf();
-    setNumberOfRows(job, Long.parseLong(args[0]));
-    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    if (args.length != 2) {
+      usage();
+      return 1;
+    }
+    setNumberOfRows(job, parseHumanLong(args[0]));
+    Path outputDir = new Path(args[1]);
+    if (outputDir.getFileSystem(job).exists(outputDir)) {
+      throw new IOException("Output directory " + outputDir + 
+                            " already exists.");
+    }
+    FileOutputFormat.setOutputPath(job, outputDir);
     job.setJobName("TeraGen");
     job.setJarByClass(TeraGen.class);
     job.setMapperClass(SortGenMapper.class);

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java?rev=816831&r1=816830&r2=816831&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java Sat Sep 19 00:26:07 2009
@@ -18,15 +18,15 @@
 
 package org.apache.hadoop.examples.terasort;
 
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -37,6 +37,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * An input format that reads the first 10 characters of each line as the key
@@ -46,10 +47,43 @@
 public class TeraInputFormat extends FileInputFormat<Text,Text> {
 
   static final String PARTITION_FILENAME = "_partition.lst";
-  static final String SAMPLE_SIZE = "mapreduce.terasort.partitions.sample";
+  private static final String NUM_PARTITIONS = "terasort.num.partitions";
+  private static final String SAMPLE_SIZE = "terasort.partitions.sample";
+  static final int KEY_LENGTH = 10;
+  static final int VALUE_LENGTH = 90;
+  static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
   private static JobConf lastConf = null;
   private static InputSplit[] lastResult = null;
 
+  static class TeraFileSplit extends FileSplit {
+    private String[] locations;
+    public TeraFileSplit() {}
+    public TeraFileSplit(Path file, long start, long length, String[] hosts) {
+      super(file, start, length, hosts);
+      locations = hosts;
+    }
+    protected void setLocations(String[] hosts) {
+      locations = hosts;
+    }
+    @Override
+    public String[] getLocations() {
+      return locations;
+    }
+    public String toString() {
+      StringBuffer result = new StringBuffer();
+      result.append(getPath());
+      result.append(" from ");
+      result.append(getStart());
+      result.append(" length ");
+      result.append(getLength());
+      for(String host: getLocations()) {
+        result.append(" ");
+        result.append(host);
+      }
+      return result.toString();
+    }
+  }
+
   static class TextSampler implements IndexedSortable {
     private ArrayList<Text> records = new ArrayList<Text>();
 
@@ -67,7 +101,9 @@
     }
 
     public void addKey(Text key) {
-      records.add(new Text(key));
+      synchronized (this) {
+        records.add(new Text(key));
+      }
     }
 
     /**
@@ -80,7 +116,7 @@
     Text[] createPartitions(int numPartitions) {
       int numRecords = records.size();
       System.out.println("Making " + numPartitions + " from " + numRecords + 
-                         " records");
+                         " sampled records");
       if (numPartitions > numRecords) {
         throw new IllegalArgumentException
           ("Requested more partitions than input keys (" + numPartitions +
@@ -88,7 +124,6 @@
       }
       new QuickSort().sort(this, 0, records.size());
       float stepSize = numRecords / (float) numPartitions;
-      System.out.println("Step size is " + stepSize);
       Text[] result = new Text[numPartitions-1];
       for(int i=1; i < numPartitions; ++i) {
         result[i-1] = records.get(Math.round(stepSize * i));
@@ -105,54 +140,86 @@
    * @param partFile where to write the output file to
    * @throws IOException if something goes wrong
    */
-  public static void writePartitionFile(JobConf conf, 
+  public static void writePartitionFile(final JobConf conf, 
                                         Path partFile) throws IOException {
-    TeraInputFormat inFormat = new TeraInputFormat();
-    TextSampler sampler = new TextSampler();
-    Text key = new Text();
-    Text value = new Text();
+    long t1 = System.currentTimeMillis();
+    final TeraInputFormat inFormat = new TeraInputFormat();
+    final TextSampler sampler = new TextSampler();
     int partitions = conf.getNumReduceTasks();
     long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
-    InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
-    int samples = Math.min(10, splits.length);
-    long recordsPerSample = sampleSize / samples;
-    int sampleStep = splits.length / samples;
-    long records = 0;
+    final InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+    long t2 = System.currentTimeMillis();
+    System.out.println("Computing input splits took " + (t2 - t1) + "ms");
+    int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.length);
+    System.out.println("Sampling " + samples + " splits of " + splits.length);
+    final long recordsPerSample = sampleSize / samples;
+    final int sampleStep = splits.length / samples;
+    Thread[] samplerReader = new Thread[samples];
     // take N samples from different parts of the input
     for(int i=0; i < samples; ++i) {
-      RecordReader<Text,Text> reader = 
-        inFormat.getRecordReader(splits[sampleStep * i], conf, null);
-      while (reader.next(key, value)) {
-        sampler.addKey(key);
-        records += 1;
-        if ((i+1) * recordsPerSample <= records) {
-          break;
+      final int idx = i;
+      samplerReader[i] = 
+        new Thread ("Sampler Reader " + idx) {
+        {
+          setDaemon(true);
         }
-      }
+        public void run() {
+          Text key = new Text();
+          Text value = new Text();
+          long records = 0;
+          try {
+            RecordReader<Text,Text> reader = 
+              inFormat.getRecordReader(splits[sampleStep * idx], conf, null);
+            while (reader.next(key, value)) {
+              sampler.addKey(key);
+              records += 1;
+              if (recordsPerSample <= records) {
+                break;
+              }
+            }
+          } catch (IOException ie){
+            System.err.println("Got an exception while reading splits " +
+                StringUtils.stringifyException(ie));
+            System.exit(-1);
+          }
+        }
+      };
+      samplerReader[i].start();
     }
     FileSystem outFs = partFile.getFileSystem(conf);
-    if (outFs.exists(partFile)) {
-      outFs.delete(partFile, false);
+    DataOutputStream writer = outFs.create(partFile, true, 64*1024, (short) 10, 
+                                           outFs.getDefaultBlockSize());
+    for (int i = 0; i < samples; i++) {
+      try {
+        samplerReader[i].join();
+      } catch (InterruptedException e) {
+      }
     }
-    SequenceFile.Writer writer = 
-      SequenceFile.createWriter(outFs, conf, partFile, Text.class, 
-                                NullWritable.class);
-    NullWritable nullValue = NullWritable.get();
     for(Text split : sampler.createPartitions(partitions)) {
-      writer.append(split, nullValue);
+      split.write(writer);
     }
     writer.close();
+    long t3 = System.currentTimeMillis();
+    System.out.println("Computing parititions took " + (t3 - t2) + "ms");
   }
 
   static class TeraRecordReader implements RecordReader<Text,Text> {
-    private LineRecordReader in;
-    private LongWritable junk = new LongWritable();
-    private Text line = new Text();
-    private static int KEY_LENGTH = 10;
+    private FSDataInputStream in;
+    private long offset;
+    private long length;
+    private static final int RECORD_LENGTH = KEY_LENGTH + VALUE_LENGTH;
+    private byte[] buffer = new byte[RECORD_LENGTH];
 
     public TeraRecordReader(Configuration job, 
                             FileSplit split) throws IOException {
-      in = new LineRecordReader(job, split);
+      Path p = split.getPath();
+      FileSystem fs = p.getFileSystem(job);
+      in = fs.open(p);
+      long start = split.getStart();
+      // find the offset to start at a record boundary
+      offset = (RECORD_LENGTH - (start % RECORD_LENGTH)) % RECORD_LENGTH;
+      in.seek(start + offset);
+      length = split.getLength();
     }
 
     public void close() throws IOException {
@@ -172,23 +239,29 @@
     }
 
     public float getProgress() throws IOException {
-      return in.getProgress();
+      return (float) offset / length;
     }
 
     public boolean next(Text key, Text value) throws IOException {
-      if (in.next(junk, line)) {
-        if (line.getLength() < KEY_LENGTH) {
-          key.set(line);
-          value.clear();
-        } else {
-          byte[] bytes = line.getBytes();
-          key.set(bytes, 0, KEY_LENGTH);
-          value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
-        }
-        return true;
-      } else {
+      if (offset >= length) {
         return false;
       }
+      int read = 0;
+      while (read < RECORD_LENGTH) {
+        long newRead = in.read(buffer, read, RECORD_LENGTH - read);
+        if (newRead == -1) {
+          if (read == 0) {
+            return false;
+          } else {
+            throw new EOFException("read past eof");
+          }
+        }
+        read += newRead;
+      }
+      key.set(buffer, 0, KEY_LENGTH);
+      value.set(buffer, KEY_LENGTH, VALUE_LENGTH);
+      offset += RECORD_LENGTH;
+      return true;
     }
   }
 
@@ -201,12 +274,28 @@
   }
 
   @Override
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts) {
+    return new TeraFileSplit(file, start, length, hosts);
+  }
+
+  @Override
   public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
     if (conf == lastConf) {
       return lastResult;
     }
+    long t1, t2, t3;
+    t1 = System.currentTimeMillis();
     lastConf = conf;
     lastResult = super.getSplits(conf, splits);
+    t2 = System.currentTimeMillis();
+    System.out.println("Spent " + (t2 - t1) + "ms computing base-splits.");
+    if (conf.getBoolean("terasort.use.terascheduler", true)) {
+      TeraScheduler scheduler = new TeraScheduler((FileSplit[]) lastResult, conf);
+      lastResult = scheduler.getNewFileSplits();
+      t3 = System.currentTimeMillis(); 
+      System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits.");
+    }
     return lastResult;
   }
 }



Mime
View raw message