Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 93103 invoked from network); 18 Oct 2009 10:01:03 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Oct 2009 10:01:03 -0000 Received: (qmail 46369 invoked by uid 500); 18 Oct 2009 10:01:03 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 46318 invoked by uid 500); 18 Oct 2009 10:01:03 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 46308 invoked by uid 99); 18 Oct 2009 10:01:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 10:01:03 +0000 X-ASF-Spam-Status: No, hits=-2.5 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 10:01:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 43AA923888FF; Sun, 18 Oct 2009 10:00:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r826393 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ Date: Sun, 18 Oct 2009 10:00:40 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091018100040.43AA923888FF@eris.apache.org> Author: cdouglas Date: Sun Oct 18 10:00:39 2009 New Revision: 826393 URL: http://svn.apache.org/viewvc?rev=826393&view=rev Log: MAPREDUCE-1061. Add unit test validating byte specifications for gridmix jobs. Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=826393&r1=826392&r2=826393&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Sun Oct 18 10:00:39 2009 @@ -752,3 +752,6 @@ MAPREDUCE-1104. Initialize RecoveryManager in JobTracker cstr called by Mumak. (Hong Tang via cdouglas) + + MAPREDUCE-1061. Add unit test validating byte specifications for gridmix + jobs. (cdouglas) Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=826393&r1=826392&r2=826393&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Sun Oct 18 10:00:39 2009 @@ -200,10 +200,7 @@ } // scan input dir contents submitter.refreshFilePool(); - } catch (IOException e) { - LOG.error("Startup failed", e); - if (factory != null) factory.abort(); // abort pipeline - } catch (InterruptedException e) { + } catch (Throwable e) { LOG.error("Startup failed", e); if (factory != null) factory.abort(); // abort pipeline } finally { @@ -214,8 +211,10 @@ if (factory != null) { // wait for input exhaustion factory.join(); - if (null != factory.error()) { - throw factory.error(); + final Throwable badTraceException = factory.error(); + if (null != badTraceException) { + LOG.error("Error in trace", badTraceException); + throw new IOException("Error in trace", badTraceException); } // wait for pending tasks to be submitted submitter.shutdown(); Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=826393&r1=826392&r2=826393&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Sun Oct 18 10:00:39 2009 @@ -530,6 +530,16 @@ } } } + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + while (written < outBytes) { + final int len = (int) Math.min(outBytes - written, val.getCapacity()); + fillBytes(val, len); + context.write(NullWritable.get(), val); + written += len; + } + } } static class GridmixRecordReader Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java?rev=826393&r1=826392&r2=826393&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/DebugJobFactory.java Sun Oct 18 10:00:39 2009 @@ -112,13 +112,13 @@ public MockJob(Configuration conf) { this(conf.getInt(MIN_BYTES_IN, 1 << 20), - conf.getInt(VAR_BYTES_IN, 10 << 20), + conf.getInt(VAR_BYTES_IN, 5 << 20), conf.getInt(MIN_BYTES_OUT, 1 << 20), - conf.getInt(VAR_BYTES_OUT, 10 << 20), + conf.getInt(VAR_BYTES_OUT, 5 << 20), conf.getInt(MIN_REC_SIZE , 100), conf.getInt(VAR_REC_SIZE , 1 << 15), - conf.getInt(MAX_MAPS, 6), - conf.getInt(MAX_REDS, 4)); + conf.getInt(MAX_MAPS, 5), + conf.getInt(MAX_REDS, 3)); } public MockJob(int min_bytes_in, int var_bytes_in, @@ -126,7 +126,7 @@ int min_rec_size, int var_rec_size, int max_maps, int max_reds) { final Random r = new Random(); - name = String.format("MOCKJOB%04d", seq.getAndIncrement()); + name = String.format("MOCKJOB%05d", seq.getAndIncrement()); submitTime = timestamp.addAndGet(TimeUnit.MILLISECONDS.convert( r.nextInt(10), TimeUnit.SECONDS)); int iMapBTotal = 0, oMapBTotal = 0, iRedBTotal = 0, oRedBTotal = 0; Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=826393&r1=826392&r2=826393&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Sun Oct 18 10:00:39 2009 @@ -19,6 +19,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -30,12 +33,20 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import static org.apache.hadoop.mapreduce.TaskCounter.*; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -64,9 +75,13 @@ public static void initCluster() throws IOException { Configuration conf = new Configuration(); conf.setBoolean(JTConfig.JT_RETIREJOBS, false); + conf.setInt(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1000); + conf.setBoolean(JTConfig.JT_PERSIST_JOBSTATUS, true); + conf.setInt(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, 1); dfsCluster = new MiniDFSCluster(conf, 3, true, null); dfs = dfsCluster.getFileSystem(); - mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1); + mrCluster = new MiniMRCluster(3, dfs.getUri().toString(), 1, null, null, + new JobConf(conf)); } @AfterClass @@ -81,6 +96,7 @@ static class TestMonitor extends JobMonitor { + static final long SLOPBYTES = 5 * 1024; private final int expected; private final BlockingQueue retiredJobs; @@ -90,9 +106,142 @@ retiredJobs = new LinkedBlockingQueue(); } - public void verify(ArrayList submitted) { + public void verify(ArrayList submitted) throws Exception { final ArrayList succeeded = new ArrayList(); assertEquals("Bad job count", expected, retiredJobs.drainTo(succeeded)); + final HashMap sub = new HashMap(); + for (JobStory spec : submitted) { + sub.put(spec.getName(), spec); + } + for (Job job : succeeded) { + final String jobname = job.getJobName(); + if ("GRIDMIX_GENDATA".equals(jobname)) { + final Path in = new Path("foo").makeQualified(dfs); + final Path out = new Path("/gridmix").makeQualified(dfs); + final ContentSummary generated = dfs.getContentSummary(in); + assertTrue("Mismatched data gen", // +/- 100k for logs + (GENDATA << 20) < generated.getLength() + GENSLOP || + (GENDATA << 20) > generated.getLength() - GENSLOP); + FileStatus[] outstat = dfs.listStatus(out); + assertEquals("Mismatched job count", NJOBS, outstat.length); + continue; + } + final JobStory spec = + sub.get(job.getJobName().replace("GRIDMIX", "MOCKJOB")); + assertNotNull("No spec for " + job.getJobName(), spec); + assertNotNull("No counters for " + job.getJobName(), job.getCounters()); + + final int nMaps = spec.getNumberMaps(); + final int nReds = spec.getNumberReduces(); + + System.out.println(jobname + ": " + nMaps + "/" + nReds); + final TaskReport[] mReports = job.getTaskReports(TaskType.MAP); + assertEquals("Mismatched map count", nMaps, mReports.length); + check(TaskType.MAP, job, spec, mReports, + 0, 1, nReds * SLOPBYTES, nReds + 1); + + final TaskReport[] rReports = job.getTaskReports(TaskType.REDUCE); + assertEquals("Mismatched reduce count", nReds, rReports.length); + check(TaskType.REDUCE, job, spec, rReports, + nMaps * SLOPBYTES, nMaps + 1, 0, 1); + } + } + + public void check(final TaskType type, Job job, JobStory spec, + final TaskReport[] runTasks, + long extraInputBytes, int extraInputRecords, + long extraOutputBytes, int extraOutputRecords) throws Exception { + + long[] runInputRecords = new long[runTasks.length]; + long[] runInputBytes = new long[runTasks.length]; + long[] runOutputRecords = new long[runTasks.length]; + long[] runOutputBytes = new long[runTasks.length]; + long[] specInputRecords = new long[runTasks.length]; + long[] specInputBytes = new long[runTasks.length]; + long[] specOutputRecords = new long[runTasks.length]; + long[] specOutputBytes = new long[runTasks.length]; + + for (int i = 0; i < runTasks.length; ++i) { + final TaskInfo specInfo; + final Counters counters = runTasks[i].getTaskCounters(); + switch (type) { + case MAP: + runInputBytes[i] = counters.findCounter("FileSystemCounters", + "HDFS_BYTES_READ").getValue(); + runInputRecords[i] = + (int)counters.findCounter(MAP_INPUT_RECORDS).getValue(); + runOutputBytes[i] = + counters.findCounter(MAP_OUTPUT_BYTES).getValue(); + runOutputRecords[i] = + (int)counters.findCounter(MAP_OUTPUT_RECORDS).getValue(); + + specInfo = spec.getTaskInfo(TaskType.MAP, i); + break; + case REDUCE: + runInputBytes[i] = + counters.findCounter(REDUCE_SHUFFLE_BYTES).getValue(); + runInputRecords[i] = + (int)counters.findCounter(REDUCE_INPUT_RECORDS).getValue(); + runOutputBytes[i] = + counters.findCounter("FileSystemCounters", + "HDFS_BYTES_WRITTEN").getValue(); + runOutputRecords[i] = + (int)counters.findCounter(REDUCE_OUTPUT_RECORDS).getValue(); + + specInfo = spec.getTaskInfo(TaskType.REDUCE, i); + break; + default: + specInfo = null; + fail("Unexpected type: " + type); + } + specInputBytes[i] = specInfo.getInputBytes(); + specInputRecords[i] = specInfo.getInputRecords(); + specOutputRecords[i] = specInfo.getOutputRecords(); + specOutputBytes[i] = specInfo.getOutputBytes(); + System.out.printf(type + " SPEC: %9d -> %9d :: %5d -> %5d\n", + specInputBytes[i], specOutputBytes[i], + specInputRecords[i], specOutputRecords[i]); + System.out.printf(type + " RUN: %9d -> %9d :: %5d -> %5d\n", + runInputBytes[i], runOutputBytes[i], + runInputRecords[i], runOutputRecords[i]); + } + + // Check input bytes + Arrays.sort(specInputBytes); + Arrays.sort(runInputBytes); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue("Mismatched input bytes " + + specInputBytes[i] + "/" + runInputBytes[i], + runInputBytes[i] - specInputBytes[i] <= extraInputBytes); + } + + // Check input records + Arrays.sort(specInputRecords); + Arrays.sort(runInputRecords); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue("Mismatched input records " + + specInputRecords[i] + "/" + runInputRecords[i], + runInputRecords[i] - specInputRecords[i] <= extraInputRecords); + } + + // Check output bytes + Arrays.sort(specOutputBytes); + Arrays.sort(runOutputBytes); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue("Mismatched output bytes " + + specOutputBytes[i] + "/" + runOutputBytes[i], + runOutputBytes[i] - specOutputBytes[i] <= extraOutputBytes); + } + + // Check output records + Arrays.sort(specOutputRecords); + Arrays.sort(runOutputRecords); + for (int i = 0; i < runTasks.length; ++i) { + assertTrue("Mismatched output records " + + specOutputRecords[i] + "/" + runOutputRecords[i], + runOutputRecords[i] - specOutputRecords[i] <= extraOutputRecords); + } + } @Override @@ -110,7 +259,7 @@ private DebugJobFactory factory; private TestMonitor monitor; - public void checkMonitor() { + public void checkMonitor() throws Exception { monitor.verify(factory.getSubmitted()); } @@ -146,12 +295,6 @@ int res = ToolRunner.run(conf, client, argv); assertEquals("Client exited with nonzero status", 0, res); client.checkMonitor(); - final ContentSummary generated = dfs.getContentSummary(in); - assertTrue("Mismatched data gen", // +/- 100k for logs - (GENDATA << 20) < generated.getLength() + GENSLOP || - (GENDATA << 20) > generated.getLength() - GENSLOP); - FileStatus[] outstat = dfs.listStatus(out); - assertEquals("Mismatched job count", NJOBS, outstat.length); } }