Author: cdouglas Date: Wed Jul 21 19:06:06 2010 New Revision: 966365 URL: http://svn.apache.org/viewvc?rev=966365&view=rev Log: MAPREDUCE-1936. Modify Gridmix3 to support more tunable parameters for stress submission and sleep jobs. Contributed by Hong Tang Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomAlgorithm.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=966365&r1=966364&r2=966365&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 21 19:06:06 2010 @@ -89,6 +89,9 @@ Trunk (unreleased changes) MAPREDUCE-1945. The MapReduce component for HADOOP-6632. (Kan Zhang & Jitendra Pandey via ddas) + MAPREDUCE-1936. Modify Gridmix3 to support more tunable parameters for + stress submission and sleep jobs. (Hong Tang via cdouglas) + OPTIMIZATIONS MAPREDUCE-1354. Enhancements to JobTracker for better performance and Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java?rev=966365&r1=966364&r2=966365&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FilePool.java Wed Jul 21 19:06:06 2010 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred.gridmix; import java.io.IOException; + import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; @@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.gridmix.RandomAlgorithms.Selector; /** * Class for caching a pool of input data to be used by synthetic jobs for @@ -135,69 +137,6 @@ class FilePool { throws IOException; } - interface IndexMapper { - int get(int pos); - void swap(int a, int b); - } - - /** - * A sparse index mapping table - useful when we want to - * non-destructively permute a small fraction of a large array. - */ - static class SparseIndexMapper implements IndexMapper { - Map mapping = new HashMap(); - - public int get(int pos) { - Integer mapped = mapping.get(pos); - if (mapped == null) return pos; - return mapped; - } - - public void swap(int a, int b) { - int valA = get(a); - int valB = get(b); - if (b == valA) { - mapping.remove(b); - } else { - mapping.put(b, valA); - } - if (a == valB) { - mapping.remove(a); - } else { - mapping.put(a, valB); - } - } - } - - /** - * A dense index mapping table - useful when we want to - * non-destructively permute a large fraction of an array. - */ - static class DenseIndexMapper implements IndexMapper { - int[] mapping; - - DenseIndexMapper(int size) { - mapping = new int[size]; - for (int i=0; i=mapping.length) ) { - throw new IndexOutOfBoundsException(); - } - return mapping[pos]; - } - - public void swap(int a, int b) { - int valA = get(a); - int valB = get(b); - mapping[a]=valB; - mapping[b]=valA; - } - } - /** * Files in current directory of this Node. */ @@ -223,22 +162,15 @@ class FilePool { return getSize(); } - IndexMapper mapping; - if ((curdir.size() < 200) || ((double) targetSize / getSize() > 0.5)) { - mapping = new DenseIndexMapper(curdir.size()); - } else { - mapping = new SparseIndexMapper(); - } - + Selector selector = new Selector(curdir.size(), (double) targetSize + / getSize(), rand); + ArrayList selected = new ArrayList(); long ret = 0L; - int poolSize = curdir.size(); do { - int pos = rand.nextInt(poolSize); - int index = mapping.get(pos); + int index = selector.next(); selected.add(index); ret += curdir.get(index).getLen(); - mapping.swap(pos, --poolSize); } while (ret < targetSize); for (Integer i : selected) { Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=966365&r1=966364&r2=966365&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Wed Jul 21 19:06:06 2010 @@ -394,32 +394,67 @@ public class Gridmix extends Configured } } - protected void printUsage(PrintStream out) { - ToolRunner.printGenericCommandUsage(out); - out.println("Usage: gridmix [-generate ] [-users URI] "); - out.println(" e.g. gridmix -generate 100m foo -"); - out.println("Configuration parameters:"); - out.printf(" %-42s : Output directory\n", GRIDMIX_OUT_DIR); - out.printf(" %-42s : Submitting threads\n", GRIDMIX_SUB_THR); - out.printf(" %-42s : Queued job desc\n", GRIDMIX_QUE_DEP); - out.printf(" %-42s : Key fraction of rec\n", - AvgRecordFactory.GRIDMIX_KEY_FRC); - out.printf(" %-42s : User resolution class\n", GRIDMIX_USR_RSV); - out.printf(" %-42s : Enable/disable using queues in trace\n", - GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE); - out.printf(" %-42s : Default queue\n", - GridmixJob.GRIDMIX_DEFAULT_QUEUE); - + private String getEnumValues(Enum[] e) { StringBuilder sb = new StringBuilder(); String sep = ""; - for (GridmixJobSubmissionPolicy p : GridmixJobSubmissionPolicy.values()) { + for (Enum v : e) { sb.append(sep); - sb.append(p.name()); + sb.append(v.name()); sep = "|"; } - out.printf(" %-42s : Job submission policy (%s)\n", - GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, sb.toString()); + return sb.toString(); + } + + private String getJobTypes() { + return getEnumValues(JobCreator.values()); } + + private String getSubmissionPolicies() { + return getEnumValues(GridmixJobSubmissionPolicy.values()); + } + + protected void printUsage(PrintStream out) { + ToolRunner.printGenericCommandUsage(out); + out.println("Usage: gridmix [-generate ] [-users URI] [-Dname=value ...] "); + out.println(" e.g. gridmix -generate 100m foo -"); + out.println("Configuration parameters:"); + out.println(" General parameters:"); + out.printf(" %-48s : Output directory\n", GRIDMIX_OUT_DIR); + out.printf(" %-48s : Submitting threads\n", GRIDMIX_SUB_THR); + out.printf(" %-48s : Queued job desc\n", GRIDMIX_QUE_DEP); + out.printf(" %-48s : User resolution class\n", GRIDMIX_USR_RSV); + out.printf(" %-48s : Job types (%s)\n", JobCreator.GRIDMIX_JOB_TYPE, getJobTypes()); + out.println(" Parameters related to job submission:"); + out.printf(" %-48s : Default queue\n", + GridmixJob.GRIDMIX_DEFAULT_QUEUE); + out.printf(" %-48s : Enable/disable using queues in trace\n", + GridmixJob.GRIDMIX_USE_QUEUE_IN_TRACE); + out.printf(" %-48s : Job submission policy (%s)\n", + GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, getSubmissionPolicies()); + out.println(" Parameters specific for LOADJOB:"); + out.printf(" %-48s : Key fraction of rec\n", + AvgRecordFactory.GRIDMIX_KEY_FRC); + out.println(" Parameters specific for SLEEPJOB:"); + out.printf(" %-48s : Whether to ignore reduce tasks\n", + SleepJob.SLEEPJOB_MAPTASK_ONLY); + out.printf(" %-48s : Number of fake locations for map tasks\n", + JobCreator.SLEEPJOB_RANDOM_LOCATIONS); + out.printf(" %-48s : Maximum map task runtime in mili-sec\n", + SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME); + out.printf(" %-48s : Maximum reduce task runtime in mili-sec (merge+reduce)\n", + SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME); + out.println(" Parameters specific for STRESS submission throttling policy:"); + out.printf(" %-48s : jobs vs task-tracker ratio\n", + StressJobFactory.CONF_MAX_JOB_TRACKER_RATIO); + out.printf(" %-48s : maps vs map-slot ratio\n", + StressJobFactory.CONF_OVERLOAD_MAPTASK_MAPSLOT_RATIO); + out.printf(" %-48s : reduces vs reduce-slot ratio\n", + StressJobFactory.CONF_OVERLOAD_REDUCETASK_REDUCESLOT_RATIO); + out.printf(" %-48s : map-slot share per job\n", + StressJobFactory.CONF_MAX_MAPSLOT_SHARE_PER_JOB); + out.printf(" %-48s : reduce-slot share per job\n", + StressJobFactory.CONF_MAX_REDUCESLOT_SHARE_PER_JOB); + } /** * Components in the pipeline must support the following operations for Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java?rev=966365&r1=966364&r2=966365&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobCreator.java Wed Jul 21 19:06:06 2010 @@ -20,14 +20,23 @@ package org.apache.hadoop.mapred.gridmix import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.ClusterStatus; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.rumen.JobStory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public enum JobCreator { - LOADJOB("LOADJOB") { + LOADJOB { @Override public GridmixJob createGridmixJob( Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot, @@ -35,22 +44,39 @@ public enum JobCreator { return new LoadJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq); }}, - SLEEPJOB("SLEEPJOB") { + SLEEPJOB { + private String[] hosts; + @Override public GridmixJob createGridmixJob( Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot, UserGroupInformation ugi, int seq) throws IOException { - return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq); + int numLocations = conf.getInt(SLEEPJOB_RANDOM_LOCATIONS, 0); + if (numLocations < 0) numLocations=0; + if ((numLocations > 0) && (hosts == null)) { + final JobClient client = new JobClient(new JobConf(conf)); + ClusterStatus stat = client.getClusterStatus(true); + final int nTrackers = stat.getTaskTrackers(); + final ArrayList hostList = new ArrayList(nTrackers); + final Pattern trackerPattern = Pattern.compile("tracker_([^:]*):.*"); + final Matcher m = trackerPattern.matcher(""); + for (String tracker : stat.getActiveTrackerNames()) { + m.reset(tracker); + if (!m.find()) { + continue; + } + final String name = m.group(1); + hostList.add(name); + } + hosts = hostList.toArray(new String[hostList.size()]); + } + return new SleepJob(conf, submissionMillis, jobdesc, outRoot, ugi, seq, + numLocations, hosts); }}; public static final String GRIDMIX_JOB_TYPE = "gridmix.job.type"; - - - private final String name; - - JobCreator(String name) { - this.name = name; - } + public static final String SLEEPJOB_RANDOM_LOCATIONS = + "gridmix.sleep.fake-locations"; public abstract GridmixJob createGridmixJob( final Configuration conf, long submissionMillis, final JobStory jobdesc, Added: hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java?rev=966365&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java (added) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomAlgorithms.java Wed Jul 21 19:06:06 2010 @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * Random algorithms. + */ +public class RandomAlgorithms { + + private interface IndexMapper { + int get(int pos); + void swap(int a, int b); + int getSize(); + void reset(); + } + + /** + * A sparse index mapping table - useful when we want to + * non-destructively permute a small fraction of a large array. + */ + private static class SparseIndexMapper implements IndexMapper { + Map mapping = new HashMap(); + int size; + + SparseIndexMapper(int size) { + this.size = size; + } + + public int get(int pos) { + Integer mapped = mapping.get(pos); + if (mapped == null) return pos; + return mapped; + } + + public void swap(int a, int b) { + if (a == b) return; + int valA = get(a); + int valB = get(b); + if (b == valA) { + mapping.remove(b); + } else { + mapping.put(b, valA); + } + if (a == valB) { + mapping.remove(a); + } else { + mapping.put(a, valB); + } + } + + public int getSize() { + return size; + } + + public void reset() { + mapping.clear(); + } + } + + /** + * A dense index mapping table - useful when we want to + * non-destructively permute a large fraction of an array. + */ + private static class DenseIndexMapper implements IndexMapper { + int[] mapping; + + DenseIndexMapper(int size) { + mapping = new int[size]; + for (int i=0; i=mapping.length) ) { + throw new IndexOutOfBoundsException(); + } + return mapping[pos]; + } + + public void swap(int a, int b) { + if (a == b) return; + int valA = get(a); + int valB = get(b); + mapping[a]=valB; + mapping[b]=valA; + } + + public int getSize() { + return mapping.length; + } + + public void reset() { + return; + } + } + + /** + * Iteratively pick random numbers from pool 0..n-1. Each number can only be + * picked once. + */ + public static class Selector { + private IndexMapper mapping; + private int n; + private Random rand; + + /** + * Constructor. + * + * @param n + * The pool of integers: 0..n-1. + * @param selPcnt + * Percentage of selected numbers. This is just a hint for internal + * memory optimization. + * @param rand + * Random number generator. + */ + public Selector(int n, double selPcnt, Random rand) { + if (n <= 0) { + throw new IllegalArgumentException("n should be positive"); + } + + boolean sparse = (n > 200) && (selPcnt < 0.1); + + this.n = n; + mapping = (sparse) ? new SparseIndexMapper(n) : new DenseIndexMapper(n); + this.rand = rand; + } + + /** + * Select the next random number. + * @return Random number selected. Or -1 if the remaining pool is empty. + */ + public int next() { + switch (n) { + case 0: return -1; + case 1: + { + int index = mapping.get(0); + --n; + return index; + } + default: + { + int pos = rand.nextInt(n); + int index = mapping.get(pos); + mapping.swap(pos, --n); + return index; + } + } + } + + /** + * Get the remaining random number pool size. + */ + public int getPoolSize() { + return n; + } + + /** + * Reset the selector for reuse usage. + */ + public void reset() { + mapping.reset(); + n = mapping.getSize(); + } + } + + + /** + * Selecting m random integers from 0..n-1. + * @return An array of selected integers. + */ + public static int[] select(int m, int n, Random rand) { + if (m >= n) { + int[] ret = new int[n]; + for (int i=0; i rand = + new ThreadLocal () { + @Override protected Random initialValue() { + return new Random(); + } + }; + + public static final String SLEEPJOB_MAPTASK_ONLY="gridmix.sleep.maptask-only"; + private final boolean mapTasksOnly; + private final int fakeLocations; + private final String[] hosts; + private final Selector selector; + /** * Interval at which to report progress, in seconds. */ public static final String GRIDMIX_SLEEP_INTERVAL = "gridmix.sleep.interval"; - - public SleepJob( - Configuration conf, long submissionMillis, JobStory jobdesc, Path outRoot, - UserGroupInformation ugi, int seq) throws IOException { + public static final String GRIDMIX_SLEEP_MAX_MAP_TIME = + "gridmix.sleep.max-map-time"; + public static final String GRIDMIX_SLEEP_MAX_REDUCE_TIME = + "gridmix.sleep.max-reduce-time"; + + private final long mapMaxSleepTime, reduceMaxSleepTime; + + public SleepJob(Configuration conf, long submissionMillis, JobStory jobdesc, + Path outRoot, UserGroupInformation ugi, int seq, int numLocations, + String[] hosts) throws IOException { super(conf, submissionMillis, jobdesc, outRoot, ugi, seq); + this.fakeLocations = numLocations; + this.hosts = hosts; + this.selector = (fakeLocations > 0)? new Selector(hosts.length, (float) fakeLocations + / hosts.length, rand.get()) : null; + this.mapTasksOnly = conf.getBoolean(SLEEPJOB_MAPTASK_ONLY, false); + mapMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_MAP_TIME, Long.MAX_VALUE); + reduceMaxSleepTime = conf.getLong(GRIDMIX_SLEEP_MAX_REDUCE_TIME, + Long.MAX_VALUE); } @Override @@ -74,7 +102,7 @@ public class SleepJob extends GridmixJob throws IOException, ClassNotFoundException, InterruptedException { job.setMapperClass(SleepMapper.class); job.setReducerClass(SleepReducer.class); - job.setNumReduceTasks(jobdesc.getNumberReduces()); + job.setNumReduceTasks((mapTasksOnly) ? 0 : jobdesc.getNumberReduces()); job.setMapOutputKeyClass(GridmixKey.class); job.setMapOutputValueClass(NullWritable.class); job.setSortComparatorClass(GridmixKey.Comparator.class); @@ -340,7 +368,7 @@ public class SleepJob extends GridmixJob @Override void buildSplits(FilePool inputDir) throws IOException { final List splits = new ArrayList(); - final int reds = jobdesc.getNumberReduces(); + final int reds = (mapTasksOnly) ? 0 : jobdesc.getNumberReduces(); final int maps = jobdesc.getNumberMaps(); for (int i = 0; i < maps; ++i) { final int nSpec = reds / maps + ((reds % maps) > i ? 1 : 0); @@ -350,7 +378,8 @@ public class SleepJob extends GridmixJob (ReduceTaskAttemptInfo) getSuccessfulAttemptInfo(TaskType.REDUCE, i + j * maps); // Include only merge/reduce time - redDurations[j] = info.getMergeRuntime() + info.getReduceRuntime(); + redDurations[j] = Math.min(reduceMaxSleepTime, info.getMergeRuntime() + + info.getReduceRuntime()); if (LOG.isDebugEnabled()) { LOG.debug( String.format( @@ -359,9 +388,19 @@ public class SleepJob extends GridmixJob } } final TaskAttemptInfo info = getSuccessfulAttemptInfo(TaskType.MAP, i); - splits.add(new SleepSplit(i, info.getRuntime(), redDurations, maps, - new String[0])); - } + ArrayList locations = new ArrayList(fakeLocations); + if (fakeLocations > 0) { + selector.reset(); + } + for (int k=0; k + * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.gridmix; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import org.junit.Test; + +import com.sun.tools.javac.code.Attribute.Array; + +public class TestRandomAlgorithm { + private static final int[][] parameters = new int[][] { + {5, 1, 1}, + {10, 1, 2}, + {10, 2, 2}, + {20, 1, 3}, + {20, 2, 3}, + {20, 3, 3}, + {100, 3, 10}, + {100, 3, 100}, + {100, 3, 1000}, + {100, 3, 10000}, + {100, 3, 100000}, + {100, 3, 1000000} + }; + + private List convertIntArray(int[] from) { + List ret = new ArrayList(from.length); + for (int v : from) { + ret.add(v); + } + return ret; + } + + private void testRandomSelectSelector(int niter, int m, int n) { + RandomAlgorithms.Selector selector = new RandomAlgorithms.Selector(n, + (double) m / n, new Random()); + Map, Integer> results = new HashMap, Integer>( + niter); + for (int i = 0; i < niter; ++i, selector.reset()) { + int[] result = new int[m]; + for (int j = 0; j < m; ++j) { + int v = selector.next(); + if (v < 0) + break; + result[j]=v; + } + Arrays.sort(result); + List resultAsList = convertIntArray(result); + Integer count = results.get(resultAsList); + if (count == null) { + results.put(resultAsList, 1); + } else { + results.put(resultAsList, ++count); + } + } + + verifyResults(results, m, n); + } + + private void testRandomSelect(int niter, int m, int n) { + Random random = new Random(); + Map, Integer> results = new HashMap, Integer>( + niter); + for (int i = 0; i < niter; ++i) { + int[] result = RandomAlgorithms.select(m, n, random); + Arrays.sort(result); + List resultAsList = convertIntArray(result); + Integer count = results.get(resultAsList); + if (count == null) { + results.put(resultAsList, 1); + } else { + results.put(resultAsList, ++count); + } + } + + verifyResults(results, m, n); + } + + private void verifyResults(Map, Integer> results, int m, int n) { + if (n>=10) { + assertTrue(results.size() >= Math.min(m, 2)); + } + for (List result : results.keySet()) { + assertEquals(m, result.size()); + Set seen = new HashSet(); + for (int v : result) { + System.out.printf("%d ", v); + assertTrue((v >= 0) && (v < n)); + assertTrue(seen.add(v)); + } + System.out.printf(" ==> %d\n", results.get(result)); + } + System.out.println("===="); + } + + @Test + public void testRandomSelect() { + for (int[] param : parameters) { + testRandomSelect(param[0], param[1], param[2]); + } + } + + @Test + public void testRandomSelectSelector() { + for (int[] param : parameters) { + testRandomSelectSelector(param[0], param[1], param[2]); + } + } +} Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java?rev=966365&r1=966364&r2=966365&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java (original) +++ hadoop/mapreduce/trunk/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java Wed Jul 21 19:06:06 2010 @@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Level; @@ -34,6 +38,7 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -128,6 +133,31 @@ public class TestSleepJob { } @Test + public void testRandomLocationSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + System.out.println(" Random locations started at " + System.currentTimeMillis()); + doSubmission("-D"+JobCreator.SLEEPJOB_RANDOM_LOCATIONS+"=3"); + System.out.println(" Random locations ended at " + System.currentTimeMillis()); + } + + @Test + public void testMapTasksOnlySubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + System.out.println(" Map tasks only at " + System.currentTimeMillis()); + doSubmission("-D"+SleepJob.SLEEPJOB_MAPTASK_ONLY+"=true"); + System.out.println(" Map tasks only ended at " + System.currentTimeMillis()); + } + + @Test + public void testLimitTaskSleepTimeSubmit() throws Exception { + policy = GridmixJobSubmissionPolicy.STRESS; + System.out.println(" Limit sleep time only at " + System.currentTimeMillis()); + doSubmission("-D" + SleepJob.GRIDMIX_SLEEP_MAX_MAP_TIME + "=100", "-D" + + SleepJob.GRIDMIX_SLEEP_MAX_REDUCE_TIME + "=200"); + System.out.println(" Limit sleep time ended at " + System.currentTimeMillis()); + } + + @Test public void testStressSubmit() throws Exception { policy = GridmixJobSubmissionPolicy.STRESS; System.out.println(" Stress started at " + System.currentTimeMillis()); @@ -143,25 +173,83 @@ public class TestSleepJob { System.out.println("Serial ended at " + System.currentTimeMillis()); } - - private void doSubmission() throws Exception { + @Test + public void testRandomLocation() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + // testRandomLocation(0, 10, ugi); + testRandomLocation(1, 10, ugi); + testRandomLocation(2, 10, ugi); + } + + private void testRandomLocation(int locations, int njobs, UserGroupInformation ugi) throws Exception { + Configuration conf = new Configuration(); + conf.setInt(JobCreator.SLEEPJOB_RANDOM_LOCATIONS, locations); + DebugJobProducer jobProducer = new DebugJobProducer(njobs, conf); + JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf)); + JobStory story; + int seq=1; + while ((story = jobProducer.getNextJob()) != null) { + GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0, + story, new Path("ignored"), ugi, seq++); + gridmixJob.buildSplits(null); + List splits = new SleepJob.SleepInputFormat() + .getSplits(gridmixJob.getJob()); + for (InputSplit split : splits) { + assertEquals(locations, split.getLocations().length); + } + } + } + + @Test + public void testMapTasksOnlySleepJobs() + throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(SleepJob.SLEEPJOB_MAPTASK_ONLY, true); + DebugJobProducer jobProducer = new DebugJobProducer(5, conf); + JobConf jconf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf)); + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + JobStory story; + int seq = 1; + while ((story = jobProducer.getNextJob()) != null) { + GridmixJob gridmixJob = JobCreator.SLEEPJOB.createGridmixJob(jconf, 0, + story, new Path("ignored"), ugi, seq++); + gridmixJob.buildSplits(null); + Job job = gridmixJob.call(); + assertEquals(0, job.getNumReduceTasks()); + } + } + + private void doSubmission(String...optional) throws Exception { final Path in = new Path("foo").makeQualified(GridmixTestUtils.dfs); final Path out = GridmixTestUtils.DEST.makeQualified(GridmixTestUtils.dfs); final Path root = new Path("/user"); Configuration conf = null; try { - final String[] argv = {"-D" + FilePool.GRIDMIX_MIN_FILE + "=0", - "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out, - "-D" + Gridmix.GRIDMIX_USR_RSV + "=" - + EchoUserResolver.class.getName(), - "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" - + JobCreator.SLEEPJOB.name(), - "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL +"=" +"10", - "-generate", - String.valueOf(GENDATA) + "m", in.toString(), - "-" - // ignored by DebugGridmix + // required options + final String[] required = { + "-D" + FilePool.GRIDMIX_MIN_FILE + "=0", + "-D" + Gridmix.GRIDMIX_OUT_DIR + "=" + out, + "-D" + Gridmix.GRIDMIX_USR_RSV + "=" + EchoUserResolver.class.getName(), + "-D" + JobCreator.GRIDMIX_JOB_TYPE + "=" + JobCreator.SLEEPJOB.name(), + "-D" + SleepJob.GRIDMIX_SLEEP_INTERVAL + "=" + "10" + }; + // mandatory arguments + final String[] mandatory = { + "-generate",String.valueOf(GENDATA) + "m", in.toString(), "-" + // ignored by DebugGridmix }; + + ArrayList argv = new ArrayList(required.length+optional.length+mandatory.length); + for (String s : required) { + argv.add(s); + } + for (String s : optional) { + argv.add(s); + } + for (String s : mandatory) { + argv.add(s); + } + DebugGridmix client = new DebugGridmix(); conf = new Configuration(); conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY, policy); @@ -169,7 +257,12 @@ public class TestSleepJob { // allow synthetic users to create home directories GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short) 0777)); GridmixTestUtils.dfs.setPermission(root, new FsPermission((short) 0777)); - int res = ToolRunner.run(conf, client, argv); + String[] args = argv.toArray(new String[argv.size()]); + System.out.println("Command line arguments:"); + for (int i=0; i